Introduction

Apache Airflow is a platform defined in code that is used to schedule, monitor, and organize complex workflows and data pipelines. In today’s world with more and more automated tasks, data integration, and process streams, there’s a need to have powerful and flexible tool that will handle the scheduling and monitoring of your jobs. Regardless of the industry you're in, there are growing sets of tasks that need to be performed in a certain order, monitored during their execution with some alerts set in case of completion or - even more importantly - errors.

In addition it would be great to know how the processes change over time. Do they take more time? Do failures occur?

Sometimes complex processes consist of a set of multiple tasks that have plenty of dependencies. While it’s quite easy to list as a parent-child set, a long list can be very difficult to analyze. A good tool should provide a way to make it easier.

That’s where Airflow comes into play, to help with the challenges mentioned above. In this article, we are going to introduce the concepts of Apache Airflow and give you a step-by-step tutorial and examples of how to make Apache Airflow work better for you.

Table of Contents 

Concepts

Installation

User Interface

DAG Code

Automation

Sensors

Connectors

Concepts

DAGs

Let’s talk about concepts Airflow is based on. First, when you set up a bunch of tasks to be executed in a particular order, you’re creating a DAG- Directed Acyclic Graph. Directed means the tasks are executed in some order. Acyclic- as you cannot create loops (i.e. cycles). A graph- it’s a very convenient way to view the process. Here's a simple sample including a task to print the date followed by two tasks run in parallel. It can be viewed in a tree form:

undefined

A bit hard to read at first. As you can see the leaves of the tree indicate the very first task to start with, followed by branches that form a trunk.

Graphs can also be rendered in a top-down or bottom-up form. This still is a directed graph and Airflow allows to choose whichever layout the user prefers.

undefined

So, the DAGs describe how to run tasks. DAGs are defined in Python files that are placed in Airflow’s DAG_FOLDER. There can be as many DAGs as you need. Although each one can mention multiple tasks, it’s a good idea to keep one logical workflow in one file.

Operators

While DAGs describe how things should be executed, the Operators tell what is there to be done. The Operator should be atomic, describing a single task in a workflow, that doesn’t need to share anything with other operators.

Airflow makes it possible for a single DAG to use even separate machines, so operators should really be independent.

Here are the operators provided by Airflow:

  • BashOperator - for executing a bash command
  • PythonOperator - to call Python functions
  • EmailOperator - for sending emails
  • SimpleHttpOperator - 
  • DB operators (e.g.: MySqlOperator, SqlliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, etc.) - for executing SQL commands
  • Sensor - to wait for a certain event (like file or a row in database) or time

The above list is simplified and covers only the basics.

Tasks

Operators refer to tasks that they execute. In fact a task is the instance of the operator,  like:

energy_operator = PythonOperator(task_id='report_blackouts', python_callable=enea_check, dag=dag)

Here the energy_operator is an instance of PythonOperator that has been assigned a task_id, a python_callable function and some DAG to be a part of it.

Installation

Airflow installation on Windows is not a smooth one. After struggling with it, getting the Microsoft visual c++ compiler for python 3.7 and failing again I’ve used the following instructions to actually get over it by using Ubuntu WSL. Here are the steps: 

  1. Get Ubuntu WSL from Windows Store
  2. Install Ubuntu
  3. Run bash
  4. Verify it comes with python 3.6.8 or so ("python3 -version"). Airflow3.png
  5. Add these packages so that installing PIP will work.
    1. sudo apt-get install software-properties-common
    2. sudo apt-add-repository universe
    3. sudo apt-get update
  6. Install pip with:
    1. sudo apt install python3 python3-pip ipython3
    2. sudo apt install virtualenv
  7. Create venv
    1. virtualenv -p path_to_python/bin/python3 env_name
  8. Run the following 2 commands to install airflow:
    1. export SLUGIFY_USES_TEXT_UNICODE=yes
    2. pip3 install apache-airflow
  9. Open a new terminal (I was surprised, but this seemed to be required).
  10. Init the airflow DB:
    1. airflow initdb

It should now be possible to invoke airflow version

(airflow) vic@DESKTOP-I5D2O6C:/mnt/c/airflow$ airflow version
[2019-06-27 14:21:26,307] {__init__.py:51} INFO - Using executor SequentialExecutor
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _ __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/  v1.10.3

And here’s the folder structure Airflow creates:

(airflow) vic@DESKTOP-I5D2O6C:/mnt/c/airflow/workspace/airflow_home$ tree
.
├── airflow.cfg
├── airflow.db
├── logs
│   └── scheduler
│       ├── 2019-06-27
│       └── latest ->
└── unittests.cfg

User Interface

Let’s have a look at the UI, which is never the core feature of any integration tool - yet, I find it as one of the major advantages of Airflow. Once you run the airflow web-server command you should be able to access the admin console at localhost:8080/admin

airflow4.png

There’s a list of available DAGs. By default some examples are shown. This can be disabled in airflow.cfg file:

# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
load_examples = True

This DAG list gives the basic information, like:

  • Id
  • Schedule
  • Owner
  • Status and counts of tasks for active runs
  • Last execution time
  • Status and counts of DAG runs
  • Links for quick accessing DAG details

The list also allows enabling and disabling DAGs in a convenient way. It’s also quite clear which of those are active at the moment. Choosing any of the IDs switches to details with Tree View as default:

airflow5.png

The list mentions all the operators in a tree view (mentioned above) with run statuses across time. Filters make it possible to view statuses for any time span. Hovering any of them brings up a popup with additional details:

airflow13.png

Task duration shows a very useful graph for performance analysis:

airflow12.png
It’s also possible to check the details that mention - apart from the schedule visible right there on the main list - concurrency setting, file name and task IDs.

undefined

It’s also possible to Trigger DAG from this screen, refresh or delete DAG.

DAG Code

On the DAG detail screen it’s also possible to check the code, let’s review it using the simple example below:

undefined
This import section is responsible for including the airflow library as well as any other used within this DAG:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from functions import check_enea_blackouts
from functions import send_email_message

Next, there’s a function definition

def enea_check():
    email_message = check_enea_blackouts.report_blackouts()
    print(email_message)
   return 'Blackouts check completed.'

In my case it’s a simple function (not fully included in this example - I’m just importing it) to check the blackouts mentioned on my energy operator’s web page. Here in this example the content of the email is just printed in logs.

Next there’s a DAG instance created with schedule to run every 5 minutes between 7 AM and 11 PM. Start date is set in the past, but Airflow is also told not to catch up - in this case that would be quite a few runs. This can be set to true if for example there’s a need to load some set of data on a daily basis, but first, as part of the go-live it’s needed to load past data day-by-day. Another great feature!

dag = DAG('check_enea_blackouts', description='Checking blackouts on Enea website',
          schedule_interval='*/5 7-23 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

Then there are tasks (Operator instances) created. In this case Dummy and Python operators are used.

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

enea_operator = PythonOperator(task_id='report_blackouts', python_callable=enea_check, dag=dag)

Finally the dependency section. In this case it’s very simple and indicates that dummy_operator should be followed by my enea_operator. It couldn’t be any simpler:

dummy_operator >> enea_operator

The last two lines are used for testing. It’s a good idea to prepare your DAGs in a way that would allow unit testing them, before scheduling them in Airflow.

if __name__ == '__main__':
    enea_check()

Automation

Creating complex examples by script

Another absolutely great thing about Airflow is that it’s very easy to automate DAG creation. I tried achieving two goals here:

  • Show the ease of DAG scripting
  • Check how Airflow works with complex workflows.

In my professional career I used to deal with workflow built of over thousand of tasks (sometimes even few thousands). This makes monitoring and any reviews quite challenging. Let’s see how Airflow handles a complex example.

For the purpose of this exercise I’ve created a script that uses python site-packages to create a task for each file mentioned in this location. I create a set containing all the file names (with dashes replaced to underscores). Next I print the header containing imports, DAG instance definition with schedule and the dummy operator to start with. Then I use a task definition template in a loop to create tasks for every file. Finally, I need to add dependencies. For every file I randomly add up to 5 upstream dependencies. Here’s the full script used for creating the DAG:

from os import listdir
from random import randint


root_path = r"/mnt/c/venv/airflow/lib/python3.6/site-packages"
file_set = set()

for filename in listdir(root_path):
    filename = filename.split('.')[0]
    file_set.add(filename.replace("-", "_"))

common_part = """
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from functions import check_enea_blackouts
from functions import send_email_message

dag = DAG('filechecker_complex_dag_example', description='Taks for file operations.',
          schedule_interval='*/15 7,11,16 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)"""


DAG_task_definition = """
def {0}():
    print('{0} invoked.')
    return '{0} check completed.'

{0}_operator = PythonOperator(task_id='{0}_operator', python_callable={0}, dag=dag)

dummy_operator >> {0}_operator"""

print(common_part)
files_in_dag = []
for filename in file_set:
    print (DAG_task_definition.format(filename))
    for i in range(randint(0, min(6, len(files_in_dag)))):
        print ('{0}_operator >> {1}_operator'.format(files_in_dag[randint(0, len(files_in_dag)-1)], filename))
    files_in_dag += [filename]

Monitoring a complex workflow

Here’s the graph overview:
undefined

Impossible to get all the details at once, right? Well, just by hovering over the status names, we get a better view. For example, by hovering over ‘success’ it’s possible to quickly check what part of the DAG has already been completed:

undefined

Actually it’s very convenient to manipulate the graph, zoom in and out, check task details or move it around on any zoom level. And it works amazingly fast! I did not mention Gantt chart above for the simple example - it now makes much more sense to review it than just for one task:

undefined

Sensors

Sensors are a special type of Operators used to do the monitoring of other resources. A Sensor is a subclass of the BaseSensorOperator. It's poke function is getting called in a loop until:

  • it returns True
  • will raise and AirflowSkipException - in this case the instance status will be set to Skipped
  • another exception will be raised- it will be retried according to the value of retries.

For example FileSensor, which uses the BaseSensorOperator (as mentioned in the docs) can be defined as follows:

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator

import datetime
import airflow

default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:

    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )

start_task >> sensor_task >> stop_task

Connectors

AirFlow allows defining a variety of connections. Just go to Admin > Connections tab to see a default, very extensive list:

undefined

Final Thoughts

Apache Airflow's Open Source platform enables data engineers to author, monitor, and create, complex enterprise grade workflows. It is important to note that Apache Airflow is not an ETL tool but can integrate with one. Apache Airflow is becoming widely adopted by data teams, including some of Xplenty's larger customers. 

Integrating Apache Airflow with Xplenty

Apache Airflow with Xplenty enables enterprise wide workflows that seamlessly integrate ETL steps. Xplenty is a cloud-based ETL Tool that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. To learn more about using Xplenty in combination with Apache Airflow, contact us here!