Celery Executor in Airflow

Table of Contents

Creating a python file

  • Create a new python file inside the airflow/dags directory on your system as “celery_executor_demo.py” and open the file in your favorite editor.

Importing the modules

  • We will import the “DAG” module and the “operators.python” module from the airflow package.
  • We will also import the “datetime” and “time ”modules to help us schedule and monitor the execution of the dags.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import time

Defining a DAG object

  • We will now initialize a DAG object as follows:
with DAG(
dag_id="celery_executor_demo",
start_date=datetime(2021,1,1),
schedule_interval="@hourly",
catchup=False) as dag:

Defining Tasks

  • We will create fourteen tasks inside our DAG, “start ”and “end ”tasks indicate the start and end of the dag.
  • The other tasks are task2_1, task2_2, task2_3, task3_1, task3_2, task3_3, task3_4, task3_5, task3_6, task4_1, task4_2, and task5.
  • These 12 tasks call a “sleeping_function ”and sleep for 5 seconds.
  • The task definitions will look like the following:
task1=PythonOperator(
task_id="start",
python_callable=hello_function

task2_1=PythonOperator(
task_id="sleepy_1",
python_callable=sleeping_function
)
# Other 11 tasks are the same as task2_1 except for their task_id.
# You can refer to the link below for reference
task6=PythonOperator(
task_id="end",
python_callable=last_function
)
  • We have attached the link to the code file below for your reference.

Defining callable functions

  • task1 calls a function named as hello_function that will print some text and sleep for 5 seconds using the time.sleep() function.
  • task6 calls a function named as bye_function that will print some pre-defined text and will indicate the end of the DAGRun.
  • The rest of the tasks will call a function named sleeping_function that will sleep for 5 seconds using the time.sleep() function.
  • The callable functions will look like this:
def hello_function():
print('Hello, this is the first task of the DAG')
time.sleep(5)
def last_function():
print('DAG run is done.')
def sleeping_function():
print("Sleeping for 5 seconds")
time.sleep(5)

Setting dependencies

  • The dependencies will be set such that task2_1, task2_2, and task2_3 will run in parallel after task1 has finished its execution.
task1 >> [task2_1, task2_2, task2_3]
  • Tasks task3_1 and task3_2 will run only after task2_1 has finished its execution.
task2_1 >> [task3_1, task3_2]
  • Tasks task3_3 and task3_4 will run only after task2_2 has finished its execution.
task2_2 >> [task3_3, task3_4]
  • Tasks task3_5 and task3_6 will run only after task2_3 has finished its execution.
task2_3 >> [task3_5, task3_6]
  • Tasks task4_1 will run only after task3_1, task3_2, and task3_3 have successfully finished their execution.
[task3_1, task3_2, task3_3] >> task4_1
  • Tasks task4_2 will run only after task3_4, task3_5, and task3_6 have successfully finished their execution.
[task3_4, task3_5, task3_6] >> task4_2
  • Tasks task5 will run only after task4_1 and task4_2 have successfully finished their execution.
  • Finally, task6 will end the DAGRun after it gets triggered by the execution of the task5.
[task4_1, task4_2] >> task5 >> task6
  • The dependencies will look like this:
task1>>[task2_1,task2_2, task2_3]
task2_1>>[task3_1, task3_2]
task2_2>>[task3_3, task3_4]
task2_3>>[task3_5, task3_6]
[task3_1, task3_2, task3_3]>>task4_1
[task3_4, task3_5, task3_6]>>task4_2
[task4_1, task4_2]>>task5>>task6

Code Reference

Running the DAG

Airflow Webserver
  • Activate the “celery_executor_demo” DAG, trigger it and go to the graph view. The DAGRun should look like this.
DAG Execution
  • You can also view the Gantt chart of the execution by selecting the Gantt option. The Gantt chart should look like this:
Gantt Chart for the DAG
  • We can observe the parallel execution of various tasks.
  • Congratulations! We have implemented parallel execution in Airflow using the local executor.

What’s next?

--

--

--

One of India’s leading institutions providing world-class Data Science & AI programs for working professionals with a mission to groom Data leaders of tomorrow!

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Using Flutter, Firebase, and NodeJs for your startup app

Why should you learn to code in 2022

Path-based Routing on AWS CloudFront —  Host Multiple Amplify and S3 Apps under One Domain

How to Setup to Ping Google but not able to Ping Facebook from the same System

Working with external libraries in C++ is extremely simple: all you have to do is declare a path…

Swap Caps Lock and Escape in Ubuntu 19.10! and Use Esc Easily in VIM VS Code

Quantopian Shutdown Offers Some Important Lessons

Basics of Data Base Management Systems

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
INSAID

INSAID

One of India’s leading institutions providing world-class Data Science & AI programs for working professionals with a mission to groom Data leaders of tomorrow!

More from Medium

Using a Python file to store dynamic configuration for an Airflow DAG

Airflow: Create Custom Operator from MySQL to PostgreSQL

Configure Multi Node Airflow Architecture Using Docker Compose

Creating Dynamically DAG’s Apache Airflow with Various and Dependencies Task