Celery Executor in Airflow

Table of Contents

1. Creating a python file

2. Importing the modules

3. Defining a DAG Object

4. Defining Tasks

5. Defining callable functions

6. Setting Dependencies

7. Code Reference

8. Running the DAG

9. What’s Next?

Creating a python file

Importing the modules

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time

Defining a DAG object

with DAG(
dag_id="celery_executor_demo",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:

Defining Tasks

task1=PythonOperator(
task_id="start",
python_callable=hello_function

task2_1=PythonOperator(
task_id="sleepy_1",
python_callable=sleeping_function
)
# Other 11 tasks are the same as task2_1 except for their task_id.
# You can refer to the link below for reference
task6=PythonOperator(
task_id="end",
python_callable=last_function
)

Defining callable functions

def hello_function():
print('Hello, this is the first task of the DAG')
time.sleep(5)
def last_function():
print('DAG run is done.')
def sleeping_function():
print("Sleeping for 5 seconds")
time.sleep(5)

Setting dependencies

task1 >> [task2_1, task2_2, task2_3]
task2_1 >> [task3_1, task3_2]
task2_2 >> [task3_3, task3_4]
task2_3 >> [task3_5, task3_6]
[task3_1, task3_2, task3_3] >> task4_1
[task3_4, task3_5, task3_6] >> task4_2
[task4_1, task4_2] >> task5 >> task6
task1>>[task2_1,task2_2, task2_3]
task2_1>>[task3_1, task3_2]
task2_2>>[task3_3, task3_4]
task2_3>>[task3_5, task3_6]
[task3_1, task3_2, task3_3]>>task4_1
[task3_4, task3_5, task3_6]>>task4_2
[task4_1, task4_2]>>task5>>task6

Code Reference

Running the DAG

Airflow Webserver
DAG Execution
Gantt Chart for the DAG

What’s next?

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store