Celery Executor in Airflow
Table of Contents
9. What’s Next?
We can finally use the Celery Executor that can enable us to make production-ready data pipelines. We can run a lot of tasks in parallel with the help of celery workers across different servers.
Although we will demonstrate a simple working only on our local server, this can be extended using multiple remote servers.
Creating a python file
- Create a new python file inside the airflow/dags directory on your system as “celery_executor_demo.py” and open the file in your favorite editor.
Importing the modules
- We will import the “DAG” module and the “operators.python” module from the airflow package.
- We will also import the “datetime” and “time ”modules to help us schedule and monitor the execution of the dags.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
Defining a DAG object
- We will now initialize a DAG object as follows:
catchup=False) as dag:
- We will create fourteen tasks inside our DAG, “start ”and “end ”tasks indicate the start and end of the dag.
- The other tasks are task2_1, task2_2, task2_3, task3_1, task3_2, task3_3, task3_4, task3_5, task3_6, task4_1, task4_2, and task5.
- These 12 tasks call a “sleeping_function ”and sleep for 5 seconds.
- The task definitions will look like the following:
)# Other 11 tasks are the same as task2_1 except for their task_id.
# You can refer to the link below for referencetask6=PythonOperator(
- We have attached the link to the code file below for your reference.
Defining callable functions
- task1 calls a function named as hello_function that will print some text and sleep for 5 seconds using the time.sleep() function.
- task6 calls a function named as bye_function that will print some pre-defined text and will indicate the end of the DAGRun.
- The rest of the tasks will call a function named sleeping_function that will sleep for 5 seconds using the time.sleep() function.
- The callable functions will look like this:
print('Hello, this is the first task of the DAG')
print('DAG run is done.')def sleeping_function():
print("Sleeping for 5 seconds")
- The dependencies will be set such that task2_1, task2_2, and task2_3 will run in parallel after task1 has finished its execution.
task1 >> [task2_1, task2_2, task2_3]
- Tasks task3_1 and task3_2 will run only after task2_1 has finished its execution.
task2_1 >> [task3_1, task3_2]
- Tasks task3_3 and task3_4 will run only after task2_2 has finished its execution.
task2_2 >> [task3_3, task3_4]
- Tasks task3_5 and task3_6 will run only after task2_3 has finished its execution.
task2_3 >> [task3_5, task3_6]
- Tasks task4_1 will run only after task3_1, task3_2, and task3_3 have successfully finished their execution.
[task3_1, task3_2, task3_3] >> task4_1
- Tasks task4_2 will run only after task3_4, task3_5, and task3_6 have successfully finished their execution.
[task3_4, task3_5, task3_6] >> task4_2
- Tasks task5 will run only after task4_1 and task4_2 have successfully finished their execution.
- Finally, task6 will end the DAGRun after it gets triggered by the execution of the task5.
[task4_1, task4_2] >> task5 >> task6
- The dependencies will look like this:
[task3_1, task3_2, task3_3]>>task4_1
[task3_4, task3_5, task3_6]>>task4_2
The final DAG file can be viewed here:
- Code File: celery_executor_demo.py
Running the DAG
- To see the file running, activate the virtual environment and start the following:
- mysql server
- rabbitmq server — Dashboard: http://localhost:15672/
- airflow webserver — Dashboard: http://localhost:8080/
- celery worker
- celery flower — Dashboard: http://localhost:5555/
- Activate the “celery_executor_demo” DAG, trigger it and go to the graph view. The DAGRun should look like this.
- You can also view the Gantt chart of the execution by selecting the Gantt option. The Gantt chart should look like this:
- We can observe the parallel execution of various tasks.
- Congratulations! We have implemented parallel execution in Airflow using the local executor.