Celery Executor in Airflow

By Hiren Rupchandani & Mukesh Kumar

Table of Contents

1. Creating a python file

2. Importing the modules

3. Defining a DAG Object

4. Defining Tasks

5. Defining callable functions

6. Setting Dependencies

7. Code Reference

8. Running the DAG

9. What’s Next?

We can finally use the Celery Executor that can enable us to make production-ready data pipelines. We can run a lot of tasks in parallel with the help of celery workers across different servers.

Although we will demonstrate a simple working only on our local server, this can be extended using multiple remote servers.

Creating a python file

  • Create a new python file inside the airflow/dags directory on your system as “celery_executor_demo.py” and open the file in your favorite editor.

Importing the modules

  • We will import the “DAG” module and the “operators.python” module from the airflow package.
  • We will also import the “datetime” and “time ”modules to help us schedule and monitor the execution of the dags.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time

Defining a DAG object

  • We will now initialize a DAG object as follows:
with DAG(
dag_id="celery_executor_demo",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:

Defining Tasks

  • We will create fourteen tasks inside our DAG, “start ”and “end ”tasks indicate the start and end of the dag.
  • The other tasks are task2_1, task2_2, task2_3, task3_1, task3_2, task3_3, task3_4, task3_5, task3_6, task4_1, task4_2, and task5.
  • These 12 tasks call a “sleeping_function ”and sleep for 5 seconds.
  • The task definitions will look like the following:
task1=PythonOperator(
task_id="start",
python_callable=hello_function

task2_1=PythonOperator(
task_id="sleepy_1",
python_callable=sleeping_function
)
# Other 11 tasks are the same as task2_1 except for their task_id.
# You can refer to the link below for reference
task6=PythonOperator(
task_id="end",
python_callable=last_function
)
  • We have attached the link to the code file below for your reference.

Defining callable functions

  • task1 calls a function named as hello_function that will print some text and sleep for 5 seconds using the time.sleep() function.
  • task6 calls a function named as bye_function that will print some pre-defined text and will indicate the end of the DAGRun.
  • The rest of the tasks will call a function named sleeping_function that will sleep for 5 seconds using the time.sleep() function.
  • The callable functions will look like this:
def hello_function():
print('Hello, this is the first task of the DAG')
time.sleep(5)
def last_function():
print('DAG run is done.')
def sleeping_function():
print("Sleeping for 5 seconds")
time.sleep(5)

Setting dependencies

  • The dependencies will be set such that task2_1, task2_2, and task2_3 will run in parallel after task1 has finished its execution.
task1 >> [task2_1, task2_2, task2_3]
  • Tasks task3_1 and task3_2 will run only after task2_1 has finished its execution.
task2_1 >> [task3_1, task3_2]
  • Tasks task3_3 and task3_4 will run only after task2_2 has finished its execution.
task2_2 >> [task3_3, task3_4]
  • Tasks task3_5 and task3_6 will run only after task2_3 has finished its execution.
task2_3 >> [task3_5, task3_6]
  • Tasks task4_1 will run only after task3_1, task3_2, and task3_3 have successfully finished their execution.
[task3_1, task3_2, task3_3] >> task4_1
  • Tasks task4_2 will run only after task3_4, task3_5, and task3_6 have successfully finished their execution.
[task3_4, task3_5, task3_6] >> task4_2
  • Tasks task5 will run only after task4_1 and task4_2 have successfully finished their execution.
  • Finally, task6 will end the DAGRun after it gets triggered by the execution of the task5.
[task4_1, task4_2] >> task5 >> task6
  • The dependencies will look like this:
task1>>[task2_1,task2_2, task2_3]
task2_1>>[task3_1, task3_2]
task2_2>>[task3_3, task3_4]
task2_3>>[task3_5, task3_6]
[task3_1, task3_2, task3_3]>>task4_1
[task3_4, task3_5, task3_6]>>task4_2
[task4_1, task4_2]>>task5>>task6

Code Reference

The final DAG file can be viewed here:

Running the DAG

Airflow Webserver
  • Activate the “celery_executor_demo” DAG, trigger it and go to the graph view. The DAGRun should look like this.
DAG Execution
  • You can also view the Gantt chart of the execution by selecting the Gantt option. The Gantt chart should look like this:
Gantt Chart for the DAG
  • We can observe the parallel execution of various tasks.
  • Congratulations! We have implemented parallel execution in Airflow using the local executor.

What’s next?

Kubernetes Executor in Airflow

One of India’s leading institutions providing world-class Data Science & AI programs for working professionals with a mission to groom Data leaders of tomorrow!