Table of Contents
9. What’s Next?
In this article, we will guide you through some important concepts that you need to keep in check while writing your first DAG. We will go through the code in smaller parts and see how they combine to form a DAG in airflow.
Creating a python file
- Create a new python file inside the airflow/dags directory on your system as “hello_world_dag.py” and open the file in your favorite editor.
Importing the modules
- To create a proper pipeline in airflow, we need to import the “DAG” module and a python operator from the “operators.python” module in the airflow package.
- We will also import the “datetime” module to help us schedule the dags.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
Creating a DAG object
- Next, we will instantiate a DAG object which will nest the tasks in the pipeline. We pass on a “dag_id” string which is the unique identifier of the dag.
- It is recommended to keep the python file name and “dag_id” same, so we will assign the “dag_id” as “hello_world_dag”.
- We will also set a “start_date” parameter which indicates the timestamp from which the scheduler will attempt to backfill.
- This is followed by a “schedule_interval” parameter which indicates the interval of subsequent DAG Runs created by the scheduler. This is in the form of a “datetime.timedelta” object or a cron expression. Airflow has some cron presets available such as ‘@hourly’, ‘@daily’, ‘@yearly’, etc. You can read more about them here.
- So, if the “start_date” is set as January 1, 2021, with a “schedule_interval” of hourly, then the scheduler will start a DAG Run on an hourly basis until the present hour or the “end_date” (optional parameter) has been reached. This is called catchup and we can turn it off by keeping its parameter value as False.
- After setting these parameters, our DAG initialization should look like this:
catchup=False) as dag:
Creating a Task
- According to the airflow documentation, an object instantiated from an operator is called a task. There are various types of operators available but we will first focus on the PythonOperator.
- A PythonOperator is used to call a python function inside your DAG. We will create a PythonOperator object that calls a function which will return ‘Hello World’ upon it’s call.
- Like a DAG object has “dag_id”, a PythonOperator object has a “task_id” which acts as it’s identifier.
- It also has “python_callable” parameter which takes the name of the function to be called as it’s input.
- After setting the parameters, our task should look like this:
task1 = PythonOperator(
Creating a callable function
- We also need to create a function that will be called by the PythonOperator as shown below:
- We can set the dependencies of the task by writing the task names along with >> or << to indicate the downstream or upstream flow respectively.
- Since we have a single task here, we don’t need to indicate the flow, we can simply write the task name.
Voila, it’s a DAG file
After compiling all the elements of the DAG, our final code should look like this:
Running the DAG
- In order to see the file running, activate the virtual environment and start your airflow webserver and scheduler.
- Go to http://localhost:8080/home (or your dedicated port for airflow) and you should see the following on the webserver UI:
- The DAG should run successfully. In order to check the graph view or tree view, you can hover over Links and select Graph or Tree options.
- You can also view the task’s execution information using logs. To do so, simply, click on the task and you should see the following dialog box:
- Next, click on the Log button and you will be redirected to the task’s log.
Congratulations! We have made our first DAG using airflow. In the coming articles, we will create a proper DAG with multiple tasks and dependencies among them.