Assumptions and Initial Setup
Detailed steps to set up Airflow have been covered in the previous article. This time we’ll cut some corners and get robust by using Docker Containers. This article assumes familiarity with Docker.
To use Docker on Windows/Mac you need Docker Desktop. It requires Hyper-V and therefore Windows Pro/Enterprise is needed. For those of you using Home versions, I’ll be referring to Docker Toolbox, that is a bit limited compared to Docker Desktop.
Follow the steps to install Docker Toolbox described here.
Table of Contents
- Why Apache Airflow?
- Apache Airflow setup
- Xplenty Integration
- Running the Flow
- Take-off!
- Setting DAGs on Astronomer.io
- Going the Extra Mile…
Amazon Redshift
TRUSTED BY COMPANIES WORLDWIDE
Enjoying This Article?
Receive great content weekly with the Xplenty Newsletter!
Why Apache Airflow?
Let me refer you to a short blog post by Ry Walker, Co-Founder and CEO at Astronomer to tell you why Airflow is a great choice for scheduling jobs in your project.
Apache Airflow setup
Pull Airflow Docker:
docker pull puckel/docker-airflow
This will pull a container with Airflow based on Python (3.7-slim-stretch) official image. Visit Puckel docker-airflow Docker Hub website for detailed description.
To access the DAGs created on the host inside the Docker container, enable folder sharing in the VirtualBox:
- Go to Settings -> Shared Folders
- Choose host folder
- Name the resource for Docker Containers
- Check Auto-mount and Make Permanent options.
I will assume the folder created here will contain a dag subfolder for the DAGs - in my case it’s c:\GoogleDrive\docker\airflow\dags\.
Now, in order to access application run inside the container, we also need to map some ports. In this example I assume we’ll be running Airflow on port 8080.
-
Go to Settings -> Network, expand the Advanced node and choose Port Forwarding:
- Create a rule for port 8080 - we’re assuming that host and guest ports will be the same:
Restart Docker in order to make sure the changes to shared folders are applied:
docker-machine.exe restart
For any additional python packages I’ve created the requirements.txt file that lists all the packages In my case it’s in
c:\GoogleDrive\docker\airflow\requirements.txt
Whatever you put there will be installed by the entrypoint.sh script that will execute the pip install command (with --user option).
Now we should be all set to run Airflow! Let’s give that a try by running from the airflow folder (/c/GoogleDrive/docker/airflow
in my case):
docker run -d -p 8080:8080 --env-file=env -v
/airflow/dags/:/usr/local/airflow/dags -v
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver
Here are the options explained:
- -d - Run container in background and print container ID
- -p 8080:8080 - Publish a container’s port(s) to the host. Note we’re mapping ports from container to the virtual host. We do have already mapped these ports from virtual host to our os in VirtualBox
- -env-file=env - this uses the env file created in airflow folder. We will use it later to hold the environment variables, including the Xplenty API key.
- -v /airflow/dags/:/usr/local/airflow/dags - maps the dag folder from host to container. Now any files we create or edit on the host will be accessible inside the container
- -v /airflow/requirements.txt:/requirements.txt - mounts the above mentioned requirements.txt file as ./requirements.txt for the entrypoint.sh to fetch.
- puckel/docker-airflow - the name of the image we want to create a container with
- webserver - the command to be executed
And here’s the result followed by docker ps command that lists the running containers:
If all went well, you should be able to access Airflow at http://localhost:8080/admin/
If you’d need to access shell to verify the access to mapped folders or the installed packages, run the following command:
docker run -it --env-file=env -v /airflow/dags/:/usr/local/airflow/dags -v
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow sh -c
"bash"
That should bring the container prompt:
Xplenty Integration
MongoDB
Amazon Redshift
TRUSTED BY COMPANIES WORLDWIDE
Enjoying This Article?
Receive great content weekly with the Xplenty Newsletter!
Having the scheduler in place we now need to take advantage of it and use it for building graph and scheduling integration jobs created in Xplenty.
Before we get there, we need a simple wrapper for Xplenty containers. In order to execute a package, It should support the following features:
- It should take package_id as input parameter
- It should check for available containers
- If there is no container available, it should create one
- Wait for the cluster to become available
- Execute the package
- Wait for the result of the package
- Return the result
Xplenty wrapper
Having the Xplenty API documentation it’s quite easy to create the Python script that will handle the described process.
General variables
Starting with import and setting up the general variables that will be used in later part of the script:
import os
import requests, json
from time import sleep
account_id = "my-account-id"
api_key = os.getenv('xpl_api_key')
api_url = 'https://api.xplenty.com/{}/api/'.format(account_id)
- account_id indicates the name of the account
- api_key will be used to access the account. In this example it will be stored as xpl_api_key environment variable, fetched by os.getenv
- api_url is the general url that will later on be supplemented with the needed functions
run_job explained
The main function to execute a package takes just a single parameter - package_id. Next, headers are set according to the run job documentation. Url is supplemented with jobs method.
The get_available_clusters function is called to check if any clusters are available.
headers = {
'Accept': 'application/vnd.xplenty+json; version=2',
'Content-Type': 'application/json',
}
url = api_url + 'jobs'
In case there is no cluster already up & running, a create_cluster function is called to create one.
# get available running clusters
cluster_id = get_available_cluster()
Script needs to wait for the cluster to become ready.
# if there are no available clusters, create one
if cluster_id < 0:
create_cluster()
# wait for the cluster to get created and available
while cluster_id < 0:
print('Waiting for new cluster to start...')
sleep(30)
cluster_id = get_available_cluster()
print(cluster_id)
Next, as there has been a cluster chosen, a data variable is assigned with cluster_id and package_id information wrapped in json format.
# prepare the json indicating cluster_id and package_id
data = '{{"cluster_id": {cluster_id}, "package_id":{package_id}}}'.format(cluster_id=cluster_id, package_id=package_id)
Once all is ready, a POST request is sent to the earlier defined url, along with the headers, data and authorisation key.
After a job request has been sent, the script starts to monitor for the result. It will poll Xplenty API for job status every 30 seconds as long as job is in pending or running status:
if cluster_id:
print('Running package: {}'.format(package_id))
print(url, str(data))
# run the POST request to start the job
r = requests.post(url,
headers=headers,
data=str(data),
auth=(api_key, ''))
print(r.json())
print('Started job id={}'.format(r.json()['id']))
while job_status in ('pending', 'running'):
sleep(30)
job_status = get_job_status(r.json()['id'])
print(job_status)
Finally, the status of the job is returned:
# return the final status of the job
return job_status
run_job - putting it all together. Here’s a complete function:
# return the final status of the job
return job_status
run_job - putting it all together
Here’s a complete function:
def run_job(package_id):
headers = {
'Accept': 'application/vnd.xplenty+json; version=2',
'Content-Type': 'application/json',
}
url = api_url + 'jobs'
# get available running clusters
cluster_id = get_available_cluster()
# if there are no available clusters, create one
if cluster_id < 0:
create_cluster()
# wait for the cluster to get created and available
while cluster_id < 0:
print('Waiting for new cluster to start...')
sleep(30)
cluster_id = get_available_cluster()
print(cluster_id)
# prepare the json indicating cluster_id and package_id
data = '{{"cluster_id": {cluster_id}, "package_id": {package_id} }}'.format(cluster_id=cluster_id,
package_id=package_id)
if cluster_id:
print('Running package: {}'.format(package_id))
print(url, str(data))
# run the POST request to start the job
r = requests.post(url,
headers=headers,
data=str(data),
auth=(api_key, ''))
print(r.json())
print('Started job id={}'.format(r.json()['id']))
# get the status of the job
job_status = get_job_status(r.json()['id'])
print(job_status)
# wait as long as the job is either 'pending' or 'running'
while job_status in ('pending', 'running'):
sleep(30)
job_status = get_job_status(r.json()['id'])
print(job_status)
# return the final status of the job
return job_status
get_available_clusters
As mentioned, this will fetch all clusters and check their status. If a cluster with status available is found, the function returns the id of that cluster. Otherwise a negative id is returned to indicate lack of available clusters. The API description can be found here.
def get_available_cluster():
headers = {
'Accept': 'application/vnd.xplenty+json; version=2',
}
url = api_url + 'clusters'
r = requests.get(url,
headers=headers,
auth=(api_key, ''))
for cluster in json.loads(r.text):
print('Cluster: {}, status: {}'.format(cluster['name'], cluster['status']))
if cluster['status'] == 'available':
print('Using cluster: {}'.format(cluster['name']))
return cluster['id']
print('No available cluster!')
return -1
This function lists all the clusters and can be easily extended with data = '{"status": "available"}'. This would need to be added to the url:
r = requests.get(url,
headers=headers,
data = '{"status": "available"}',
auth=(api_key, ''))
create_clusters
It’s possible to set up a cluster using the Xplenty UI:
With just a few clicks you can choose sandbox or production cluster, assign a desired number of nodes (up to 8 even!) and decide if the cluster should be terminated after inactivity.
For the purpose of this exercise, a sandbox cluster will be automatically created using the following function.
def create_cluster():
headers = {
'Accept': 'application/vnd.xplenty+json; version=2',
'Content-Type': 'application/json',
}
data = '{"nodes":1, "type":"Sandbox"}'
print(data)
print('Creating new cluster...')
url = api_url + 'clusters'
r = requests.post(url,
headers=headers,
data=data,
auth=(api_key, ''))
print(r.status_code)
print(r.text)
With the headers set, data variable holding the information about the desired cluster parameters (as per the docs, sandbox must use only a single node), the POST request is sent.
get_job_status
Final function is used to fetch the status of the job being executed. It takes just job_id parameter, sets the appropriate headers and makes a GET request, returning the status of the job:
def get_job_status(job_id):
headers = {
'Accept': 'application/vnd.xplenty+json; version=2',
}
url = '{api_url}/jobs/{job_id}'.format(api_url=api_url,
job_id=job_id)
r = requests.get(url,
headers=headers,
auth=(api_key, ''))
return r.json()['status']
Xplenty DAGs
Having the wrapper ready it is possible to create Airflow DAG that will call and execute Xplenty packages! In this case we’ll create a simple DAG that will sequentially run three packages:
With each one it will wait for the job to get completed and will start the next one.
Let’s start by importing the required airflow components, followed by the Xplenty wrapper discussed above and datetime library.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow import AirflowException
from wrapper import xplenty
from datetime import datetime
Next, let’s create a DAG definitions with a once a day schedule and a dummy_operator that will be used as a start task.
dag = DAG('Xplenty_Jobs_Schedule', description='DAG for Xplenty packages',
schedule_interval='15 15 * * *',
start_date=datetime(2017, 3, 20), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
The the function to be called by by operators to run the Xplenty packages.
def run_xplenty_package(package_id):
status = xplenty.run_job(package_id=package_id)
if status == 'failed':
raise AirflowException
return 'package {} completed with status {}'.format(package_id, status )
The operators - in this case very simple ones, one for each package, using the same function with just a different package_id. Each time basic PythonOperator is used.
rest_s3_operator = PythonOperator(task_id='rest_s3_operator',
python_callable=run_xplenty_package,
op_kwargs={'package_id': 114089},
dag=dag)
s3_mysql_operator = PythonOperator(task_id='s3_mysql_operator',
python_callable=run_xplenty_package,
op_kwargs={'package_id': 114090},
dag=dag)
salesforce_mysql_upsert_operator = PythonOperator(task_id='salesforce_mysql_upsert_operator',
python_callable=run_xplenty_package,
op_kwargs={'package_id': 114091},
dag=dag)
Finally the dependencies. As mentioned, the jobs will be executed sequentially.
dummy_operator >> rest_s3_operator
rest_s3_operator >> s3_mysql_operator
s3_mysql_operator >> salesforce_mysql_upsert_operator
Running the Flow
Putting the whole setup to work requires starting the Airflow Docker Container, checking the DAG, running it and verifying Xplenty interface.
Running the Airflow Container
Starting the container with the following command:
docker run -d -p 8080:8080 --env-file=env -v
/airflow/dags/:/usr/local/airflow/dags -v
/airflow/requirements.txt:/requirements.txt puckel/docker-airflow webserver
After a while it should be up, what can be verified looking at the logs. In my case optimistic_panini is the container name:
docker logs optimistic_panini
This should tell theDAG has been read and the SequentialExecutor is being used:
[2019-12-02 19:01:42,475] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-02 19:01:42,477] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags
Visiting http://localhost:8080/admin/ should bring up the AirflowUI:
Now, enable the DAG and review the schedule:
Or directly execute the DAG using the Links menu on the far right:
Now it should be possible to see the job runs and more, after clicking the name of the DAG. Multiple runs will be displayed in columns, showing the stats - in the example below I’m showing different statuses, like success, failed or upstream failed:
How’s that reflected on Xplenty dashboard? Let’s check!
As you can easily see, the Xplenty job statuses are reflected in Airflow - it the latest run (far right column at Airflow tree view) shows that:
- salesfore_mysql_upsert_operator was not executed - and it’s not listed in Xplenty
- s2_mysql_operator has failed - and that’s the most recent one on the Xplendy dashboard, with the failed status
- rest_s3_operator has been executed as first in this run, and completed successfully - and the same is visible on Xplenty dashboard, as the second one on the list, 100% completed.
In addition we can view the single run as Graph view:
We can review the Gantt chart:
Anytime you click a job - on tree, graph view or Gantt chart, an extra menu pops up with some additional options for marking jobs as successful, failed, running or reviewing the logs:
Take-off!
Now, having all the setup ready, one might wonder how hard would it be to actually make it production-ready and scale for the whole enterprise. Taking into account all the required infrastructure, server configuration, maintenance and availability, software installation - there’s a lot you need to ensure in order for the scheduler to be reliable. What if someone could take away all these worries and let you focus just on scheduling your jobs?
So, let us now take Xplenty further with Astronomer.io! Let’s check it things are as easy as they claim:
Starting with the guide available on the page I’ve set up a trial account and created my first Workspace.
It’s now possible to configure the New Deployment and choose appropriate executor:
Let me quote the description from Astronomer.io here:
“Airflow supports multiple executor plugins. These plugins determine how and where tasks are executed. We support the Local Executor for light or test workloads, and the Celery and Kubernetes Executors for larger, production workloads. The Celery Executor uses a distributed task queue and a scalable worker pool, whereas the Kubernetes Executor launches every task in a separate Kubernetes pod.”
Once saved, page redirects to overview and encourages to open Apache Airflow:
As you may figure out, behind the scenes the server is created - you may notice being redirected to a generated web address, which in my case is:
Whole environment is started behind and it may take a moment. Once started (refresh the browser window to verify that), Airflow main screen pops up:
But there are no DAGs! It’s completely empty - beside the scheduler. And there is no UI option to upload your DAGs. In order to do that, we need to follow the CLI quickstart instructions.
Setting DAGs on Astronomer.io
Running WSL on Windows
As long as you’re running a Windows version with Hyper-V enabled, you should be able to accomplish the steps using WSL.
Following the instructions let’s install CLI using
curl -sSL https://install.astronomer.io | sudo bash
This should do all the set up, which can be verified by running the astro command to see if help will be shown:
Let’s create a directory for the project and set it as current path:
mkdir xplenty && cd xplenty
Initializing project with astro dev init should return a confirmation message:
Now it should be possible to connect to Astronomer Cloud using:
astro auth login gcp0001.us-east4.astronomer.io
Follow the on screen instructions to log in - either with oAuth or using username/password.
Once done, a confirmation message should be visible:
Successfully authenticated to registry.gcp0001.us-east4.astronomer.io
Make sure to put the Xplenty API key into .env. While in the project directory, you should now be able to copy your DAGs over to the project, /mnt/c/Astronomer/xplenty/dag in my case. Next it should be possible to run astro deploy
This command should first give you a choice of deployment and workspaces. In the discussed example there’s just one on the list. As a result, the whole setup should get published to Astronomer.io:
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Xplenty quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon 26.62kB
Step 1/1 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> a5866d1769c4
Successfully built a5866d1769c4
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
d2a571c73db1: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-3: digest: sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-3
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b48933029f2c76e7f4f0c2433c7fcc853771acb5d60c176b357d28f6a9b6ef4b
Deploy succeeded!
root@270c02e5d9d5:/home/astronomer/xplenty#
So, your Astronomer workspace should now have the new DAG available:
Running on Docker
And while all of the above should happen, none of it actually did - I wasn’t actually able to deploy and running astro deploy
from WSL failed as follows:
vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$ astro deploy
Authenticated to gcp0001.us-east4.astronomer.io
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Xplenty quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Cannot connect to the Docker daemon at tcp://localhost:2375. Is the docker daemon running?
Error: command 'docker build -t quasarian-antenna-4223/airflow:latest failed: failed to execute cmd: exit status 1
vic@DESKTOP-I5D2O6C:/c/Astronomer/xplenty$
Docker deamon will not start on my Windows due to lack of Hyper-V. If you face the same issue - don’t worry! I will not leave you there.
First, pull a ubuntu docker image:
docker pull ubuntu
Next, we’re going to install the Astronomer CLI within the container - just as we did above. Start the container in interactive mode by
docker run -it ubuntu sh -c "bash"
Install CLI using:
curl -sSL https://install.astronomer.io | sudo bash
Create the directory for the project and set it as current path:
mkdir xplenty && cd xplenty
Initialize project with astro dev init - and check confirmation message:
root@270c02e5d9d5:/home/astronomer/xplenty# astro dev init
Initialized empty astronomer project in /home/astronomer/xplenty
Now it should be possible to connect to Astronomer Cloud using:
astro auth login gcp0001.us-east4.astronomer.io
Follow the on screen instructions to log in - either with oAuth or using username/password.
root@270c02e5d9d5:/home/astronomer/xplenty# astro auth login gcp0001.us-east4.astronomer.io
CLUSTER WORKSPACE
gcp0001.us-east4.astronomer.io ck3xaemty38yx0a383cmooskp
Switched cluster
Username (leave blank for oAuth):
Please visit the following URL, authenticate and paste token in next prompt
https://app.gcp0001.us-east4.astronomer.io/login?source=cli
oAuth Token:
Obtain and paste the token - it works great - or use username and password.
Once done, a confirmation message should be visible:
Successfully authenticated to registry.gcp0001.us-east4.astronomer.io
Having completed that step it would be great to save the state of the docker container to a new image. Just check the container ID with docker ps
In my case it’s 270c02e5d9d5
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
270c02e5d9d5 ubuntu "sh -c bash" 48 minutes ago Up 48 minutes charming_galileo
So I’ve used the following command to create an image with Astronomer installed
docker commit 270c02e5d9d5 ubuntu:astro
So, now there’s a new image, and it can be seen by running docker images command
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ubuntu astro 6f7e5bf1b01c 2 hours ago 139MB
ubuntu latest 775349758637 5 weeks ago 64.2MB
Finally. I’ve rerun the container with mounting the DAGs volume that I intend to copy to my xplenty project created inside the container. In addition, I’ve mounted the docker.sock to allow astro from within the container to reach docker:
docker run -it -v /airflow/dags/:/usr/local/Astronomer/dags/ -v /var/run/docker.sock:/var/run/docker.sock --env-file=env ubuntu:astro sh -c "bash"
Now, one last thing to add before deployment is the API key. I recommend setting it as an environment variable in your Dockerfile, like this:
root@270c02e5d9d5:/home/astronomer/xplenty# ll
total 60
drwxr-xr-x 1 root root 4096 Dec 9 14:08 ./
drwxr-xr-x 1 root root 4096 Dec 9 12:23 ../
drwxr-x--- 2 root root 4096 Dec 9 10:07 .astro/
-rw-r--r-- 1 root root 38 Dec 9 10:07 .dockerignore
-rw-r--r-- 1 root root 45 Dec 9 12:03 .env
-rw-r--r-- 1 root root 31 Dec 9 10:07 .gitignore
-rw-r--r-- 1 root root 101 Dec 9 14:00 Dockerfile
-rw-r--r-- 1 root root 556 Dec 9 10:07 airflow_settings.yaml
drwxr-xr-x 1 root root 4096 Dec 9 14:07 dags/
drwxr-xr-x 2 root root 4096 Dec 9 10:07 include/
-rw------- 1 root root 62 Dec 9 10:52 nohup.out
-rw-r--r-- 1 root root 0 Dec 9 10:07 packages.txt
drwxr-xr-x 2 root root 4096 Dec 9 10:07 plugins/
-rw-r--r-- 1 root root 0 Dec 9 10:07 requirements.txt
root@270c02e5d9d5:/home/astronomer/xplenty# more Dockerfile
FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
ENV xpl_api_key=<your-API-key-here>
Finally, everything is set for deployment! Just run astro deploy:
root@270c02e5d9d5:/home/astronomer/xplenty# astro deploy
Authenticated to gcp0001.us-east4.astronomer.io
Select which airflow deployment you want to deploy to:
# LABEL DEPLOYMENT NAME WORKSPACE DEPLOYMENT ID
1 Xplenty quasarian-antenna-4223 Trial Workspace ck3xao7sm39240a38qi5s4y74
> 1
Deploying: quasarian-antenna-4223
quasarian-antenna-4223/airflow
Building image...
Sending build context to Docker daemon 26.62kB
Step 1/2 : FROM astronomerinc/ap-airflow:0.10.3-1.10.5-onbuild
# Executing 5 build triggers
---> Using cache
---> Using cache
---> Using cache
---> Using cache
---> b4f4c9e5cb16
Step 2/2 : ENV xpl_api_key=Vf9ykgM3UCiBsDMUQpkpUyTYsp7uPQd2
---> Running in 0ec9edff34a5
Removing intermediate container 0ec9edff34a5
---> 24232535523f
Successfully built 24232535523f
Successfully tagged quasarian-antenna-4223/airflow:latest
Pushing image to Astronomer registry
The push refers to repository [registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow]
caafa5dbf9af: Pushed
01de691c8f7c: Layer already exists
6dca0d392e56: Layer already exists
097cec956145: Layer already exists
dd314892853b: Layer already exists
4285fcfc2381: Layer already exists
3f4cdd9563bd: Layer already exists
15797b66dbf6: Layer already exists
0f65bcec71fa: Layer already exists
299fd49bdb72: Layer already exists
da37bee05289: Layer already exists
132a2e1367b6: Layer already exists
03901b4a2ea8: Layer already exists
cli-11: digest: sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a size: 3023
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow:cli-11
Untagged: registry.gcp0001.us-east4.astronomer.io/quasarian-antenna-4223/airflow@sha256:b7d5f8b5b1ba49fb70549c473a52a7587c5c6a22be8141353458cb8899f4159a
Deploy succeeded!
As you may have noticed, some layers already existed in my case. It shows nicely that in case of subsequent deployments some parts are reused. I needed a few attempts before I was all set. This shows you can perform these steps multiple times in case of issues, so don’t be afraid to experiment!
Once that is done, you can go to Astronomer workspace and execute the DAG:
(Yes - it took quite a few attempts for me to finally have the setup completed! :) )
You should notice a cluster being created by the earlier described Xplenty wrapper:
and a job started:
After a while you should be able to see all jobs completed both in Xplenty:
and reflected in Astronomer graph view:
Or the tree view:
Voilà!
Going the Extra Mile…
If you’re curious, you’ve probably noticed that along the way I go some failures. It’s possible to get some details just by pointing the mouse over particular run:
Ok, the tasks “State” says it has failed - quite obviously. With just a click of a button we can get a menu that lets us check the logs:
Well, while it’s really easy to check the logs, in this case it won’t tell us much as the wrapper here is not really expressive:
So, let’s dig a bit deeper and try to investigate Xplenty. It’s as easy as finding the failed job and choosing “View details”:
This opens up a panel where we can review the variables and errors:
Now it’s pretty obvious why the job has failed:
Input(s):
Failed to read data from "xplenty://XPLENTY_USER_S3_CONNECTION_9669@xplenty.dev/mako/breakfast.csv"
The source file was not available. Now you can see how Xplenty makes failure investigation and root cause analysis super easy!
Integrating Apache Airflow with Xplenty
Airflow with Xplenty enables enterprise wide workflows that seamlessly schedule and monitor jobs to integrate with ETL. Xplenty is a cloud-based, code-free ETL software that provides simple, visualized data pipelines for automated data flows across a wide range of sources and destinations. Schedule a demo here to integrate Xplenty with Apache Airflow!