Assumptions and Initial Setup

Detailed steps to set up Airflow have been covered in the previous article. This time we’ll cut some corners and get robust by using Docker Containers. This article assumes familiarity with Docker.

To use Docker on Windows/Mac you need Docker Desktop. It requires Hyper-V and therefore Windows Pro/Enterprise is needed. For those of you using Home versions, I’ll be referring to Docker Toolbox, that is a bit limited compared to Docker Desktop.

Follow the steps to install Docker Toolbox described here.

Table of Contents 

Integrate Your Data Today!

Try Xplenty free for 7 days. No credit card required.

Why Apache Airflow?

Let me refer you to a short blog post by Ry Walker, Co-Founder and CEO at Astronomer to tell you why Airflow is a great choice for scheduling jobs in your project.

Apache Airflow setup

Pull Airflow Docker:

docker pull puckel/docker-airflow

This will pull a container with Airflow based on Python (3.7-slim-stretch) official image. Visit Puckel docker-airflow Docker Hub website for detailed description. 

To access the DAGs created on the host inside the Docker container, enable folder sharing in the VirtualBox:

  1. Go to Settings -> Shared Folders
  2. Choose host folder
  3. Name the resource for Docker Containers
  4. Check Auto-mount and Make Permanent options.

undefined

I will assume the folder created here will contain a dag subfolder for the DAGs - in my case it’s c:\GoogleDrive\docker\airflow\dags\.

Now, in order to access application run inside the container, we also need to map some ports. In this example I assume we’ll be running Airflow on port 8080. 

  1. Go to Settings -> Network, expand the Advanced node and choose Port Forwardingundefined
  2. Create a rule for port 8080 - we’re assuming that host and guest ports will be the same:undefined

Restart Docker in order to make sure the changes to shared folders are applied:

docker-machine.exe restart


For any additional python packages I’ve created the requirements.txt file that lists all the packages In my case it’s in 

c:\GoogleDrive\docker\airflow\requirements.txt

Whatever you put there will be installed by the entrypoint.sh script that will execute the pip install command (with --user option).

Now we should be all set to run Airflow! Let’s give that a try by running from the airflow folder (/c/GoogleDrive/docker/airflow in my case):

docker run -d -p 8080:8080 --env-file=env -v 
/airflow/dags/:/usr/local/airflow/dags -v 
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver

Here are the options explained:

  • -d - Run container in background and print container ID
  • -p 8080:8080 - Publish a container’s port(s) to the host. Note we’re mapping ports from container to the virtual host. We do have already mapped these ports from virtual host to our os in VirtualBox
  • -env-file=env - this uses the env file created in airflow folder. We will use it later to hold the environment variables, including the  Xplenty API key.
  • -v /airflow/dags/:/usr/local/airflow/dags - maps the dag folder from host to container. Now  any files we create or edit on the host will be accessible inside the container
  • -v /airflow/requirements.txt:/requirements.txt - mounts the above mentioned requirements.txt file as ./requirements.txt for the entrypoint.sh to fetch.
  • puckel/docker-airflow - the name of the image we want to create a container with
  • webserver - the command to be executed

And here’s the result followed by docker ps command that lists the running containers:

undefined

If all went well, you should be able to access Airflow at http://localhost:8080/admin/ 
undefined

If you’d need to access shell to verify the access to mapped folders or the installed packages, run the following command:

docker run -it --env-file=env -v /airflow/dags/:/usr/local/airflow/dags -v
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow sh -c 
"bash" 

That should bring the container prompt:undefined

Xplenty Integration

Integrate Your Data Today!

Try Xplenty free for 7 days. No credit card required.

Having the scheduler in place we now need to take advantage of it and use it for building graph and scheduling integration jobs created in Xplenty. 

Before we get there, we need a simple wrapper for Xplenty containers. In order to execute a package, It should support the following features:

  1. It should take package_id as input parameter
  2. It should check for available containers
  3. If there is no container available, it should create one
  4. Wait for the cluster to become available
  5. Execute the package
  6. Wait for the result of the package
  7. Return the result

undefined

Xplenty wrapper

Having the Xplenty API documentation it’s quite easy to create the Python script that will handle the described process.

General variables

Starting with import and setting up the general variables that will be used in later part of the script:

import os
import requests, json
from time import sleep


account_id = "my-account-id"
api_key = os.getenv('xpl_api_key')
api_url = 'https://api.xplenty.com/{}/api/'.format(account_id)
  • account_id indicates the name of the account
  • api_key will be used to access the account. In this example it will be stored as xpl_api_key environment variable, fetched by os.getenv
  • api_url is the general url that will later on be supplemented with the needed functions

run_job explained

The main function to execute a package takes just a single parameter - package_id. Next, headers are set according to the run job documentation. Url is supplemented with jobs method. 
The
get_available_clusters function is called to check if any clusters are available. 

 headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
      'Content-Type': 'application/json',
  }
  url = api_url + 'jobs'


In case there is no cluster already up & running, a
create_cluster function is called to create one.

  # get available running clusters
  cluster_id = get_available_cluster()


Script needs to wait for the cluster to become ready. 

  # if there are no available clusters, create one
  if cluster_id < 0:
      create_cluster()

  # wait for the cluster to get created and available
  while cluster_id < 0:
      print('Waiting for new cluster to start...')
      sleep(30)
      cluster_id = get_available_cluster()
      print(cluster_id)

Next, as there has been a cluster chosen, a data variable is assigned with cluster_id and package_id information wrapped in json format.

# prepare the json indicating cluster_id and package_id
  data = '{{"cluster_id": {cluster_id}, "package_id":{package_id}}}'.format(cluster_id=cluster_id, package_id=package_id)



Once all is ready, a POST request is sent to the earlier defined url, along with the headers, data and authorisation key.
After a job request has been sent, the script starts to monitor for the result. It will poll Xplenty API for job status every 30 seconds as long as job is in
pending or running status:

  if cluster_id:
      print('Running package: {}'.format(package_id))
      print(url, str(data))

      # run the POST request to start the job
      r = requests.post(url,
                        headers=headers,
                        data=str(data),
                        auth=(api_key, ''))

      print(r.json())
      print('Started job id={}'.format(r.json()['id']))

  while job_status in ('pending', 'running'):
          sleep(30)
          job_status = get_job_status(r.json()['id'])
          print(job_status)

Finally, the status of the job is returned:

  # return the final status of the job
  return job_status

run_job - putting it all together. Here’s a complete function:

   # return the final status of the job
   return job_status

run_job - putting it all together
Heres a complete function:

def run_job(package_id):
   headers = {
       'Accept': 'application/vnd.xplenty+json; version=2',
       'Content-Type': 'application/json',
   }
   url = api_url + 'jobs'
   # get available running clusters
   cluster_id = get_available_cluster()
   # if there are no available clusters, create one
   if cluster_id < 0:
       create_cluster()
   # wait for the cluster to get created and available
   while cluster_id < 0:
       print('Waiting for new cluster to start...')
       sleep(30)
       cluster_id = get_available_cluster()
       print(cluster_id)
   # prepare the json indicating cluster_id and package_id
   data = '{{"cluster_id": {cluster_id}, "package_id": {package_id} }}'.format(cluster_id=cluster_id,
                                                                          
package_id=package_id)

   if cluster_id:
       print('Running package: {}'.format(package_id))
       print(url, str(data))

       # run the POST request to start the job
       r = requests.post(url,
                         headers=headers,
                         data=str(data),
                         auth=(api_key, ''))

       print(r.json())
       print('Started job id={}'.format(r.json()['id']))
       # get the status of the job
       job_status = get_job_status(r.json()['id'])
       print(job_status)
       # wait as long as the job is either 'pending' or 'running'
       while job_status in ('pending', 'running'):
           sleep(30)
           job_status = get_job_status(r.json()['id'])
           print(job_status)
   # return the final status of the job
   return job_status

get_available_clusters

As mentioned, this will fetch all clusters and check their status. If a cluster with status available is found, the function returns the id of that cluster. Otherwise a negative id is returned to indicate lack of available clusters. The API description can be found here.

def get_available_cluster():
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
  }
  url = api_url + 'clusters'

  r = requests.get(url,
                    headers=headers,
                    auth=(api_key, ''))

  for cluster in json.loads(r.text):
      print('Cluster: {}, status: {}'.format(cluster['name'], cluster['status']))
      if cluster['status'] == 'available':
          print('Using cluster: {}'.format(cluster['name']))
          return cluster['id']
  print('No available cluster!')
  return -1


This function lists all the clusters and can be easily extended with
data = '{"status": "available"}'. This would need to be added to the url:

  r = requests.get(url,
                    headers=headers,
                    data = '{"status": "available"}',
                    auth=(api_key, ''))

create_clusters

It’s possible to set up a cluster using the Xplenty UI:

undefined

With just a few clicks you can choose sandbox or production cluster, assign a desired number of nodes (up to 8 even!) and decide if the cluster should be terminated after inactivity. 

For the purpose of this exercise, a sandbox cluster will be automatically created using the following function.

def create_cluster():
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
      'Content-Type': 'application/json',
  }

  data = '{"nodes":1, "type":"Sandbox"}'

  print(data)
  print('Creating new cluster...')
  url = api_url + 'clusters'

  r = requests.post(url,
                    headers=headers,
                    data=data,
                    auth=(api_key, ''))
  print(r.status_code)
  print(r.text)

With the headers set, data variable holding the information about the desired cluster parameters (as per the docs, sandbox must use only a single node), the POST request is sent.

get_job_status

Final function is used to fetch the status of the job being executed. It takes just job_id parameter, sets the appropriate headers and makes a GET request, returning the status of the job:

def get_job_status(job_id):
  headers = {
      'Accept': 'application/vnd.xplenty+json; version=2',
  }
  url = '{api_url}/jobs/{job_id}'.format(api_url=api_url,
                                          job_id=job_id)
  r = requests.get(url,
                    headers=headers,
                    auth=(api_key, ''))
  return r.json()['status']

Xplenty DAGs

Having the wrapper ready it is possible to create Airflow DAG that will call and execute Xplenty packages! In this case we’ll create a simple DAG that will sequentially run three packages:

undefined

With each one it will wait for the job to get completed and will start the next one. 

Let’s start by importing the required airflow components, followed by the Xplenty wrapper discussed above and datetime library.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow import AirflowException
from wrapper import xplenty
from datetime import datetime

Next, let’s create a DAG definitions with a once a day schedule and a dummy_operator that will be used as a start task.

dag = DAG('Xplenty_Jobs_Schedule', description='DAG for Xplenty packages',
        schedule_interval='15 15 * * *',
        start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

The the function to be called by by operators to run the Xplenty packages.

def run_xplenty_package(package_id):
  status = xplenty.run_job(package_id=package_id)
  if status == 'failed':
      raise AirflowException
  return 'package {} completed with status {}'.format(package_id, status  )

The operators - in this case very simple ones, one for each package, using the same function with just a different package_id. Each time basic PythonOperator is used.

rest_s3_operator = PythonOperator(task_id='rest_s3_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114089},
                                dag=dag)

s3_mysql_operator = PythonOperator(task_id='s3_mysql_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114090},
                                dag=dag)

salesforce_mysql_upsert_operator = PythonOperator(task_id='salesforce_mysql_upsert_operator',
                                python_callable=run_xplenty_package,
                                op_kwargs={'package_id': 114091},
                                dag=dag)

Finally the dependencies. As mentioned, the jobs will be executed sequentially.

dummy_operator >> rest_s3_operator
rest_s3_operator >> s3_mysql_operator
s3_mysql_operator >> salesforce_mysql_upsert_operator

Running the Flow

Putting the whole setup to work requires starting the Airflow Docker Container, checking the DAG, running it and verifying Xplenty interface.

Running the Airflow Container

Starting the container with the following command:

docker run -d -p 8080:8080 --env-file=env -v 
/airflow/dags/:/usr/local/airflow/dags -v 
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver

After a while it should be up, what can be verified looking at the logs. In my case optimistic_panini is the container name:

docker logs optimistic_panini

This should tell theDAG has been read and the SequentialExecutor is being used:

[2019-12-02 19:01:42,475] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-02 19:01:42,477] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags


Visiting
http://localhost:8080/admin/ should bring up the AirflowUI:

undefined

Now, enable the DAG and review the schedule:

undefined

Or directly execute the DAG using the Links menu on the far right:

undefined

Now it should be possible to see the job runs and more, after clicking the name of the DAG. Multiple runs will be displayed in columns, showing the stats - in the example below I’m showing different statuses, like success, failed or upstream failed:

undefined

How’s that reflected on Xplenty dashboard? Let’s check!

undefined

As you can easily see, the Xplenty job statuses are reflected in Airflow - it the latest run (far right column at Airflow tree view) shows that:

  • salesfore_mysql_upsert_operator was not executed - and it’s not listed in Xplenty
  • s2_mysql_operator has failed - and that’s the most recent one on the Xplendy dashboard, with the failed status
  • rest_s3_operator has been executed as first in this run, and completed successfully - and the same is visible on Xplenty dashboard, as the second one on the list, 100% completed.

In addition we can view the single run as Graph view:

undefined

We can review the Gantt chart:

undefined

Anytime you click a job - on tree, graph view or Gantt chart, an extra menu pops up with some additional options for marking jobs as successful, failed, running or reviewing the logs:

undefined

Take-off!

Now, having all the setup ready, one might wonder how hard would it be to actually make it production-ready and scale for the whole enterprise. Taking into account all the required infrastructure, server configuration, maintenance and availability, software installation - there’s a lot you need to ensure in order for the scheduler to be reliable. What if someone could take away all these worries and let you focus just on scheduling your jobs?

undefined

So, let us now take Xplenty further with Astronomer.io! Let’s check it things are as easy as they claim:

undefined

Starting with the guide available on the page I’ve set up a trial account and created my first Workspace.

airflow pt 2-20.png

It’s now possible to configure the New Deployment and choose appropriate executor:

undefined

Let me quote the description from Astronomer.io here:

“Airflow supports multiple executor plugins. These plugins determine how and where tasks are executed. We support the Local Executor for light or test workloads, and the Celery and Kubernetes Executors for larger, production workloads. The Celery Executor uses a distributed task queue and a scalable worker pool, whereas the Kubernetes Executor launches every task in a separate Kubernetes pod.”

Once saved, page redirects to overview and encourages to open Apache Airflow:

airflow pt 2-21.png

As you may figure out, behind the scenes the server is created - you may notice being redirected to a generated web address, which in my case is:

undefined

Whole environment is started behind and it may take a moment. Once started (refresh the browser window to verify that), Airflow main screen pops up:

airflow pt 2-22.png

But there are no DAGs! It’s completely empty - beside the scheduler. And there is no UI option to upload your DAGs. In order to do that, we need to follow the CLI quickstart instructions.

Setting DAGs on Astronomer.io

Running WSL on Windows

As long as you’re running a Windows version with Hyper-V enabled, you should be able to accomplish the steps using WSL.

Following the instructions let’s install CLI using

curl -sSL https://install.astronomer.io | sudo bash

This should do all the set up, which can be verified by running the astro command to see if help will be shown:

undefined

Let’s create a directory for the project and set it as current path:

mkdir xplenty && cd xplenty

Initializing project  with astro dev init should return a confirmation message:

undefined

Now it should be possible to connect to Astronomer Cloud using:

astro auth login gcp0001.us-east4.astronomer.io


Follow the on screen instructions to log in - either with oAuth or using username/password.

undefined

Once done, a confirmation message should be visible:

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io


Make sure to put the Xplenty API key into .env. While in the project directory, you should now be able to copy your DAGs over to the project,
/mnt/c/Astronomer/xplenty/dag in my case. Next it should be possible to run astro deploy

This command should first give you a choice of deployment and workspaces. In the discussed example there’s just one on the list. As a result, the whole setup should get published to Astronomer.io:

Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon  26.62kB
Step 1/1 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> a5866d1769c4
Successfully built a5866d1769c4
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
d2a571c73db1: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-3: digest: sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-3
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b
Deploy succeeded!
root@270c02e5d9d5:/home/astronomer/xplenty#


So, your Astronomer workspace should now have the new DAG available:

airflow pt 2-24.png

Running on Docker

And while all of the above should happen, none of it actually did - I wasn’t actually able to deploy and running astro deploy from WSL failed as follows:

vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$ astro deploy
Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Cannot connect to the Docker daemon at tcp://localhost:2375. Is the docker daemon running?
Error: command 'docker build -t quasarian-antenna-4223/airflow:latest failed: failed to execute cmd: exit status 1

vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$


Docker deamon will not start on my Windows due to lack of Hyper-V. If you face the same issue - don’t worry! I will not leave you there.

First, pull a ubuntu docker image:

docker pull ubuntu

Next, we’re going to install the Astronomer CLI within the container - just as we did above. Start the container in interactive mode by

docker run -it ubuntu sh -c "bash"

Install CLI using:

curl -sSL https://install.astronomer.io | sudo bash

Create the directory for the project and set it as current path:

mkdir xplenty && cd xplenty

Initialize project  with astro dev init - and check confirmation message:

root@270c02e5d9d5:/home/astronomer/xplenty# astro dev init
Initialized empty astronomer project in /home/astronomer/xplenty

Now it should be possible to connect to Astronomer Cloud using:

astro auth login gcp0001.us-east4.astronomer.io


Follow the on screen instructions to log in - either with oAuth or using username/password.

root@270c02e5d9d5:/home/astronomer/xplenty# astro auth login gcp0001.us-east4.astronomer.io
CLUSTER                             WORKSPACE
gcp0001.us-east4.astronomer.io      ck3xaemty38yx0a383cmooskp

Switched cluster
Username (leave blank for oAuth):

Please visit the following URL, authenticate and paste token in next prompt

https://app.gcp0001.us-east4.astronomer.io/login?source=cli

oAuth Token:

Obtain and paste the token - it works great - or use username and password.

Once done, a confirmation message should be visible:

Successfully authenticated to registry.gcp0001.us-east4.astronomer.io

Having completed that step it would be great to save the state of the docker container to a new image. Just check the container ID with docker ps

In my case it’s 270c02e5d9d5

CONTAINER ID        IMAGE   COMMAND CREATED             STATUS PORTS NAMES
270c02e5d9d5        ubuntu "sh -c bash"        48 minutes ago      Up 48 minutes                     charming_galileo

So I’ve used the following command to create an image with Astronomer installed

docker commit 270c02e5d9d5 ubuntu:astro

So, now there’s a new image, and it can be seen by running docker images command

$ docker images
REPOSITORY                       TAG IMAGE ID           CREATED SIZE
ubuntu                           astro 6f7e5bf1b01c        2 hours ago 139MB
ubuntu                           latest 775349758637        5 weeks ago 64.2MB

Finally. I’ve rerun the container with mounting the DAGs volume that I intend to copy to my xplenty project created inside the container. In addition, I’ve mounted the docker.sock to allow astro from within the container to reach docker:

docker run -it -v /airflow/dags/:/usr/local/Astronomer/dags/ -v /var/run/docker.sock:/var/run/docker.sock --env-file=env ubuntu:astro sh -c "bash"


Now, one last thing to add before deployment is the API key. I recommend setting it as an environment variable in your Dockerfile, like this:

root@270c02e5d9d5:/home/astronomer/xplenty# ll
total 60
drwxr-xr-x 1 root root 4096 Dec  9 14:08 ./
drwxr-xr-x 1 root root 4096 Dec  9 12:23 ../
drwxr-x--- 2 root root 4096 Dec  9 10:07 .astro/
-rw-r--r-- 1 root root   38 Dec 9 10:07 .dockerignore
-rw-r--r-- 1 root root   45 Dec 9 12:03 .env
-rw-r--r-- 1 root root   31 Dec 9 10:07 .gitignore
-rw-r--r-- 1 root root  101 Dec 9 14:00 Dockerfile
-rw-r--r-- 1 root root  556 Dec 9 10:07 airflow_settings.yaml
drwxr-xr-x 1 root root 4096 Dec  9 14:07 dags/
drwxr-xr-x 2 root root 4096 Dec  9 10:07 include/
-rw------- 1 root root   62 Dec 9 10:52 nohup.out
-rw-r--r-- 1 root root    0 Dec 9 10:07 packages.txt
drwxr-xr-x 2 root root 4096 Dec  9 10:07 plugins/
-rw-r--r-- 1 root root    0 Dec 9 10:07 requirements.txt
root@270c02e5d9d5:/home/astronomer/xplenty# more Dockerfile
FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
ENV xpl_api_key=<your-API-key-here>


Finally, everything is set for deployment! Just run
astro deploy:

root@270c02e5d9d5:/home/astronomer/xplenty# astro deploy
Authenticated to gcp0001.us-east4.astronomer.io

Select which airflow deployment you want to deploy to:
#     LABEL       DEPLOYMENT NAME            WORKSPACE DEPLOYMENT ID
1     Xplenty     quasarian-antenna-4223     Trial Workspace ck3xao7sm39240a38qi5s4y74

> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon  26.62kB
Step 1/2 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> b4f4c9e5cb16
Step 2/2 : ENV xpl_api_key=Vf9ykgM3UCiBsDMUQpkpUyTYsp7uPQd2
---> Running in 0ec9edff34a5
Removing intermediate container 0ec9edff34a5
---> 24232535523f
Successfully built 24232535523f
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
caafa5dbf9af: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-11: digest: sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-11
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a
Deploy succeeded!


As you may have noticed, some layers already existed in my case. It shows nicely that in case of subsequent deployments some parts are reused. I needed a few attempts before I was all set. This shows you can perform these steps multiple times in case of issues, so don’t be afraid to experiment!

Once that is done, you can go to Astronomer workspace and execute the DAG:

airflow pt 2-29.png

(Yes - it took quite a few attempts for me to finally have the setup completed! :) )

You should notice a cluster being  created by the earlier described Xplenty wrapper:

airflow pt 2-25.png

and a job started:

airflow pt 2-28.png

After a while you should be able to see all jobs completed both in Xplenty:

airflow pt 2-26.png

and reflected in Astronomer graph view:

airflow pt 2-27.png

Or the tree view:

undefined

Voilà!

Going the Extra Mile…

If you’re curious, you’ve probably noticed that along the way I go some failures. It’s possible to get some details just by pointing the mouse over particular run:

undefined

Ok, the tasks “State” says it has failed - quite obviously. With just a click of a button we can get a menu that lets us check the logs:

undefined

Well, while it’s really easy to check the logs, in this case it won’t tell us much as the wrapper here is not really expressive:

undefined

So, let’s dig a bit deeper and try to investigate Xplenty. It’s as easy as finding the failed job and choosing “View details”:

airflow pt 2-26.pngThis opens up a panel where we can review the variables and errors:

undefined

Now it’s pretty obvious why the job has failed:

Input(s):
Failed to read data from "xplenty://XPLENTY_USER_S3_CONNECTION_9669@xplenty.dev/mako/breakfast.csv"


The source file was not available. Now you can see how Xplenty makes failure investigation and root cause analysis super easy!

Integrating Apache Airflow with Xplenty

Airflow with Xplenty enables enterprise wide workflows that seamlessly schedule and monitor jobs to integrate with ETL. Xplenty is a cloud-based, code-free ETL software that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. Schedule a demo here to integrate Xplenty with Apache Airflow!