Apache Airflow is a platform defined in code that is used to schedule, monitor, and organize complex workflows and data pipelines. In today’s world with more and more automated tasks, data integration, and process streams, there’s a need to have powerful and flexible tool that will handle the scheduling and monitoring of your jobs. Regardless of the industry you're in, there are growing sets of tasks that need to be performed in a certain order, monitored during their execution with some alerts set in case of completion or - even more importantly - errors.
In addition it would be great to know how the processes change over time. Do they take more time? Do failures occur?
Sometimes complex processes consist of a set of multiple tasks that have plenty of dependencies. While it’s quite easy to list as a parent-child set, a long list can be very difficult to analyze. A good tool should provide a way to make it easier.
That’s where Airflow comes into play, to help with the challenges mentioned above. In this article, we are going to introduce the concepts of Apache Airflow and give you a step-by-step tutorial and examples of how to make Apache Airflow work better for you.
Table of Contents
Let’s talk about concepts Airflow is based on. First, when you set up a bunch of tasks to be executed in a particular order, you’re creating a DAG- Directed Acyclic Graph. Directed means the tasks are executed in some order. Acyclic- as you cannot create loops (i.e. cycles). A graph- it’s a very convenient way to view the process. Here's a simple sample including a task to print the date followed by two tasks run in parallel. It can be viewed in a tree form:
A bit hard to read at first. As you can see the leaves of the tree indicate the very first task to start with, followed by branches that form a trunk.
Graphs can also be rendered in a top-down or bottom-up form. This still is a directed graph and Airflow allows to choose whichever layout the user prefers.
So, the DAGs describe how to run tasks. DAGs are defined in Python files that are placed in Airflow’s DAG_FOLDER. There can be as many DAGs as you need. Although each one can mention multiple tasks, it’s a good idea to keep one logical workflow in one file.
While DAGs describe how things should be executed, the Operators tell what is there to be done. The Operator should be atomic, describing a single task in a workflow, that doesn’t need to share anything with other operators.
Airflow makes it possible for a single DAG to use even separate machines, so operators should really be independent.
Here are the operators provided by Airflow:
- BashOperator - for executing a bash command
- PythonOperator - to call Python functions
- EmailOperator - for sending emails
- SimpleHttpOperator -
- DB operators (e.g.: MySqlOperator, SqlliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, etc.) - for executing SQL commands
- Sensor - to wait for a certain event (like file or a row in database) or time
The above list is simplified and covers only the basics.
Operators refer to tasks that they execute. In fact a task is the instance of the operator, like:
energy_operator = PythonOperator(task_id='report_blackouts', python_callable=enea_check, dag=dag)
Here the energy_operator is an instance of PythonOperator that has been assigned a task_id, a python_callable function and some DAG to be a part of it.
Airflow installation on Windows is not a smooth one. After struggling with it, getting the Microsoft visual c++ compiler for python 3.7 and failing again I’ve used the following instructions to actually get over it by using Ubuntu WSL. Here are the steps:
- Get Ubuntu WSL from Windows Store
- Install Ubuntu
- Run bash
- Verify it comes with python 3.6.8 or so ("python3 -version").
- Add these packages so that installing PIP will work.
- sudo apt-get install software-properties-common
- sudo apt-add-repository universe
- sudo apt-get update
- Install pip with:
- sudo apt install python3 python3-pip ipython3
- sudo apt install virtualenv
- Create venv
- virtualenv -p path_to_python/bin/python3 env_name
- Run the following 2 commands to install airflow:
- export SLUGIFY_USES_TEXT_UNICODE=yes
- pip3 install apache-airflow
- Open a new terminal (I was surprised, but this seemed to be required).
- Init the airflow DB:
- airflow initdb
It should now be possible to invoke airflow version
(airflow) vic@DESKTOP-I5D2O6C:/mnt/c/airflow$ airflow version
And here’s the folder structure Airflow creates:
(airflow) vic@DESKTOP-I5D2O6C:/mnt/c/airflow/workspace/airflow_home$ tree
Let’s have a look at the UI, which is never the core feature of any integration tool - yet, I find it as one of the major advantages of Airflow. Once you run the airflow web-server command you should be able to access the admin console at localhost:8080/admin
There’s a list of available DAGs. By default some examples are shown. This can be disabled in airflow.cfg file:
# Whether to load the examples that ship with Airflow. It's good to
This DAG list gives the basic information, like:
- Status and counts of tasks for active runs
- Last execution time
- Status and counts of DAG runs
- Links for quick accessing DAG details
The list also allows enabling and disabling DAGs in a convenient way. It’s also quite clear which of those are active at the moment. Choosing any of the IDs switches to details with Tree View as default:
The list mentions all the operators in a tree view (mentioned above) with run statuses across time. Filters make it possible to view statuses for any time span. Hovering any of them brings up a popup with additional details:
Task duration shows a very useful graph for performance analysis:
It’s also possible to check the details that mention - apart from the schedule visible right there on the main list - concurrency setting, file name and task IDs.
It’s also possible to Trigger DAG from this screen, refresh or delete DAG.
On the DAG detail screen it’s also possible to check the code, let’s review it using the simple example below:
This import section is responsible for including the airflow library as well as any other used within this DAG:
from datetime import datetime
Next, there’s a function definition
In my case it’s a simple function (not fully included in this example - I’m just importing it) to check the blackouts mentioned on my energy operator’s web page. Here in this example the content of the email is just printed in logs.
Next there’s a DAG instance created with schedule to run every 5 minutes between 7 AM and 11 PM. Start date is set in the past, but Airflow is also told not to catch up - in this case that would be quite a few runs. This can be set to true if for example there’s a need to load some set of data on a daily basis, but first, as part of the go-live it’s needed to load past data day-by-day. Another great feature!
dag = DAG('check_enea_blackouts', description='Checking blackouts on Enea website',
Then there are tasks (Operator instances) created. In this case Dummy and Python operators are used.
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
Finally the dependency section. In this case it’s very simple and indicates that dummy_operator should be followed by my enea_operator. It couldn’t be any simpler:
dummy_operator >> enea_operator
The last two lines are used for testing. It’s a good idea to prepare your DAGs in a way that would allow unit testing them, before scheduling them in Airflow.
if __name__ == '__main__':
Creating complex examples by script
Another absolutely great thing about Airflow is that it’s very easy to automate DAG creation. I tried achieving two goals here:
- Show the ease of DAG scripting
- Check how Airflow works with complex workflows.
In my professional career I used to deal with workflow built of over thousand of tasks (sometimes even few thousands). This makes monitoring and any reviews quite challenging. Let’s see how Airflow handles a complex example.
For the purpose of this exercise I’ve created a script that uses python site-packages to create a task for each file mentioned in this location. I create a set containing all the file names (with dashes replaced to underscores). Next I print the header containing imports, DAG instance definition with schedule and the dummy operator to start with. Then I use a task definition template in a loop to create tasks for every file. Finally, I need to add dependencies. For every file I randomly add up to 5 upstream dependencies. Here’s the full script used for creating the DAG:
from os import listdir
Monitoring a complex workflow
Here’s the graph overview:
Impossible to get all the details at once, right? Well, just by hovering over the status names, we get a better view. For example, by hovering over ‘success’ it’s possible to quickly check what part of the DAG has already been completed:
Actually it’s very convenient to manipulate the graph, zoom in and out, check task details or move it around on any zoom level. And it works amazingly fast! I did not mention Gantt chart above for the simple example - it now makes much more sense to review it than just for one task:
Sensors are a special type of Operators used to do the monitoring of other resources. A Sensor is a subclass of the BaseSensorOperator. It's poke function is getting called in a loop until:
- it returns True
- will raise and AirflowSkipException - in this case the instance status will be set to Skipped
- another exception will be raised- it will be retried according to the value of retries.
For example FileSensor, which uses the BaseSensorOperator (as mentioned in the docs) can be defined as follows:
from airflow.contrib.sensors.file_sensor import FileSensor
AirFlow allows defining a variety of connections. Just go to Admin > Connections tab to see a default, very extensive list:
Apache Airflow's Open Source platform enables data engineers to author, monitor, and create, complex enterprise grade workflows. It is important to note that Airflow is not an ETL tool but can integrate with one. Airflow is becoming widely adopted by data teams, including some of Xplenty's larger customers.
Integrating Apache Airflow with Xplenty
Airflow with Xplenty enables enterprise wide workflows that seamlessly integrate ETL steps. Xplenty is a cloud-based ETL Tool that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. To learn more about using Xplenty in combination with Airflow, contact us here!