Set Up Celery, Flower, and RabbitMQ for Airflow

By Hiren Rupchandani & Mukesh Kumar

Table of Contents

1. Setting up a RabbitMQ Server

2. Setting up Celery and Flower

3. DAGRun

4. What’s Next?

Now that we have achieved parallel execution of tasks in airflow, it is time to move further ahead with Celery Executor to build production ready pipelines. But before we start with Celery Executor, we need to set up Celery, Flower Webserver for Celery and RabbitMQ message broker on our system.

Make sure you have your virtual environment activated while proceeding with the installations.

Setting up a RabbitMQ Server

RabbitMQ is a message broker and provides communication between multiple services by operating message queues. It provides an API for other services to publish and to subscribe to the queues. It is based on erlang programming language. The instructions to install RabbitMQ in your system are as follows:

We first need to install Erlang programming language on our system using:

(airflow_env) username@desktopname:~$ sudo apt-get install erlang

Give the Y prompt to allow installation and grab a cup of coffee while the installation takes place…

  • Now we will install RabbitMQ server using the following commands:
(airflow_env) username@desktopname:~$ sudo apt-get install rabbitmq-server
  • You may like to have a cup of coffee while rabbitmq server installs…
  • Use the following commands to start the server and check if it is running or not:
(airflow_env) username@desktopname:~$ sudo service rabbitmq-server start
OUTPUT:
* Starting RabbitMQ Messaging Server rabbitmq-server
(airflow_env) username@desktopname:~$ service --status-all
OUTPUT:
[ - ] ...
[ - ] rabbitmq-server
[ - ] ...
  • You should be able to see that rabbitmq-server is running as a service.
  • We will now install some required plugins using the following command:
(airflow_env) username@desktopname:~$ sudo rabbitmq-plugins enable rabbitmq_management
  • The default listening port for RabbitMQ is 15672. To run the dashboard, go to http://localhost:15672/ and you should be directed to a login page.
  • The default user is “guest” with the password “guest”.
  • After login, you will be redirected to the dashboard.
RabbitMQ Server Home

Create a new user named as “admin” with the password as “admin ”by typing the following command:

(airflow_env) username@desktopname:~$ sudo rabbitmqctl add_user admin admin
  • We will set the “admin” user as an administrator. To do that:
(airflow_env) username@desktopname:~$ sudo rabbitmqctl set_user_tags admin administrator
  • Now we will give full permissions to the “admin” user to read/write data:
(airflow_env) username@desktopname:~$ sudo rabbitmqctl set_permissions -p / admin "." "." ". "
  • You can now login again using the “admin ”account’s credentials.
RabbitMQ Login with admin

Setting up Celery and Flower

  • We first need to install Celery on our system. Open a new terminal and type the following command in the virtual environment:
(airflow_env) username@desktopname:~$ sudo pip3 install apache-airflow[celery]
  • This command will install celery, flower, and all the necessary dependencies required for airflow and celery.
  • Flower is a web based tool for monitoring and administrating Celery clusters.
  • Type the following command to start celery flower server:
(airflow_env) username@desktopname:~$ airflow celery flower
OUTPUT:

[2021-09-05 19:07:36,724] {command.py:135} INFO - Visit me at http://0.0.0.0:5555
[2021-09-05 19:07:36,733] {command.py:142} INFO - Broker: amqp://admin:**@localhost:5672//
[2021-09-05 19:07:36,735] {command.py:143} INFO - Registered tasks:
.
.
.
  • By default, it is assigned to port number 5555, so you can go to http://localhost:5555/ and you should be able to see the following home page:
Celery Flower Server Dashboard
  • Now that flower is working, we will make some changes in the airflow.cf file in the airflow directory. Search for the following words/variables/parameters and update the values as shown:
sql_alchemy_conn = mysql+pymysql://sql_username:sql_password@localhost/airflow_dbexecutor = CeleryExecutorbroker_url = amqp://admin:admin@localhost/result_backend = db+mysql+pymysql://sql_username:sql_password@localhost/airflow_db
  • Save and close this file.
  • Open a new terminal and start MySQL server in your environment using:
(airflow_env) username@desktopname:~$ sudo service mysql start
  • Make sure your rabbitmq-server is on as well. If not, run it using:
(airflow_env) username@desktopname:~$ sudo service rabbitmq-server start
  • Now, go to your airflow directory and type:
(airflow_env) username@desktopname:~$ airflow db init
  • It will update the database according to changes made in the airflow.cfg file.
  • The hard part is over. Now, we will check if airflow is working or not.
  • To start airflow, the MySQL and RabbitMQ services should be running before-hand.
  • We will now start four different processes: airflow webserver, scheduler, celery worker, and celery flower. You can type the following commands in different terminals:
  1. Airflow Webserver
(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow webserver

2. Airflow Scheduler

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow scheduler

3. Celery Worker

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow celery worker

4. Celery Flower

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow celery flower
Celery Flower with a Celery Worker’s Status
RabbitMQ Server Queues during execution
  • Finally, coming to the Airflow Webserver, delete all the DAGs using the Webserver UI so that they can be automatically updated by the scheduler. We recommend refreshing after 5–10 seconds after deleting a DAG from the UI.
  • Enable any of the updated DAGs from the UI and you should be able to see the execution of the DAG. You can check logs, Gantt chart, graph, or tree view of the DAG.
Airflow Webserver UI
  • So we have finally set up Airflow with Celery and RabbitMQ. In the upcoming article, we will use multiple celery workers to run an airflow DAG.

What’s next

Celery Executor in Airflow — Coming Soon

One of India’s leading institutions providing world-class Data Science & AI programs for working professionals with a mission to groom Data leaders of tomorrow!