Celery Executor in Airflow

By Hiren Rupchandani & Mukesh Kumar

Table of Contents

1. Creating a python file

2. Importing the modules

3. Defining a DAG Object

4. Defining Tasks

5. Defining callable functions

6. Setting Dependencies

7. Code Reference

8. Running the DAG

9. What’s Next?

We can finally use the Celery Executor that can enable us to make production-ready data pipelines. We can run a lot of tasks in parallel with the help of celery workers across different servers.

Although we will demonstrate a simple working only on our local server, this can be extended using multiple remote servers.

Creating a python file

Importing the modules

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time

Defining a DAG object

with DAG(
dag_id="celery_executor_demo",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:

Defining Tasks

task1=PythonOperator(
task_id="start",
python_callable=hello_function

task2_1=PythonOperator(
task_id="sleepy_1",
python_callable=sleeping_function
)
# Other 11 tasks are the same as task2_1 except for their task_id.
# You can refer to the link below for reference
task6=PythonOperator(
task_id="end",
python_callable=last_function
)

Defining callable functions

def hello_function():
print('Hello, this is the first task of the DAG')
time.sleep(5)
def last_function():
print('DAG run is done.')
def sleeping_function():
print("Sleeping for 5 seconds")
time.sleep(5)

Setting dependencies

task1 >> [task2_1, task2_2, task2_3]
task2_1 >> [task3_1, task3_2]
task2_2 >> [task3_3, task3_4]
task2_3 >> [task3_5, task3_6]
[task3_1, task3_2, task3_3] >> task4_1
[task3_4, task3_5, task3_6] >> task4_2
[task4_1, task4_2] >> task5 >> task6
task1>>[task2_1,task2_2, task2_3]
task2_1>>[task3_1, task3_2]
task2_2>>[task3_3, task3_4]
task2_3>>[task3_5, task3_6]
[task3_1, task3_2, task3_3]>>task4_1
[task3_4, task3_5, task3_6]>>task4_2
[task4_1, task4_2]>>task5>>task6

Code Reference

The final DAG file can be viewed here:

Running the DAG

Airflow Webserver
DAG Execution
Gantt Chart for the DAG

What’s next?

Kubernetes Executor in Airflow