Thanks to its user-friendliness and popularity in the field of data science, Python is one of the best programming languages for ETL. Still, coding an ETL pipeline from scratch isn’t for the faint of heart—you’ll need to handle concerns such as database connections, parallelism, job scheduling, and logging yourself. The good news is that Python makes it easier to deal with these issues by offering dozens of ETL tools and packages.

In previous articles in this series, we’ve looked at some of the best Python ETL libraries and frameworks. Below, we’ll discuss how you can put some of these resources into action.

We will cover the following Python ETL tools in detail, including example source code:

pygrametl

Airflow

pandas

Xplenty

pygrametl

pygrametl is an open-source Python ETL framework that includes built-in functionality for many common ETL processes. Within pygrametl, each dimension and fact table is represented as a Python object, allowing users to perform many common ETL operations. pygrametl runs on CPython with PostgreSQL by default, but can be modified to run on Jython as well.

import psycopg2 import pygrametl from pygrametl.datasources import
SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable
sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" 
sales_pgconn = psycopg2.connect(sales_string)

The pygrametl beginner’s guide offers an introduction to extracting data and loading it into a data warehouse. Below, the pygrametl developers demonstrate how to establish a connection to a database:

import psycopg2 import pygrametl from pygrametl.datasources import 
SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable 
sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" 
sales_pgconn = psycopg2.connect(sales_string)

psycopg2 is a Python module that facilitates connections to PostgreSQL databases. Before connecting to the source, the psycopg2.connect() function must be fed a string containing the database name, username, and password. This function can also be used to connect to the target data warehouse:

dw_string = "host='10.0.0.13' dbname='dw' user='dwuser' 

password='dwpass'" dw_pgconn = psycopg2.connect(dw_string) 
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)

In the example above, the user connects to a database named “sales.” Below is the code for extracting specific attributes from the database:

name_mapping= 'book', 'genre', 'city', 'timestamp', 'sale' sales_source = 
SQLSource(connection=sales_pgconn, \                          query="SELECT * FROM sales", names=name_mapping)

After extracting the data from the source database, we can pass into the transformation stage of ETL. In this example code, the user defines a function to perform a simple transformation. The function takes a row from the database as input, and splits a timestamp string into its three constituent parts (year, month, and day):

def split_timestamp(row):   timestamp = row['timestamp']   timestamp_split = 
timestamp.split('/')   row['year'] = timestamp_split[0]   row['month'] = 
timestamp_split[1]   row['day'] = timestamp_split[2]

As mentioned above, pygrametl treats every dimension and fact table as a separate Python object. Below, the user creates three Dimension objects for the “book" and “time” dimensions, as well as a FactTable object to store these two Dimensions:

book_dimension = Dimension(     name='book',     key='bookid',     attributes=
['book', 'genre']) time_dimension = Dimension(     name='time',     key='timeid', 
    attributes=['day', 'month', 'year']) fact_table = FactTable(     name='facttable',
     keyrefs=['bookid', 'timeid'],     measures=['sale'])

We now iterate through each row of the source sales database, storing the relevant information in each Dimension object. The ensure() function checks to see if the given row already exists within the Dimension, and if not, inserts it.

for row in sales_source:   split_timestamp(row)   row['bookid'] = 
book_dimension.ensure(row)   row['timeid'] = time_dimension.ensure(row) 
  fact_table.insert(row)

Finally, we can commit this data to the data warehouse and close the connection:

dw_conn_wrapper.commit() dw_conn_wrapper.close()

pygrametl provides a powerful ETL toolkit with many pre-built functions, combined with the power and expressiveness of regular Python. To learn more about the full functionality of pygrametl, check out the project's documentation on GitHub.

Airflow

While pygrametl is a full-fledged Python ETL framework, Airflow is designed for one purpose: to execute data pipelines through workflow automation. First developed by Airbnb, Airflow is now an open-source project maintained by the Apache Software Foundation. The basic unit of Airflow is the directed acyclic graph (DAG), which defines the relationships and dependencies between the ETL tasks that you want to run.

Airflow's developers have provided a simple tutorial to demonstrate the tool's functionality. First, the user needs to import the necessary libraries and define the default arguments for each task in the DAG:

from airflow import DAG from airflow.operators.bash_operator import 
BashOperator from airflow.utils.dates import days_ago default_args = {
   'owner': 'airflow',   'depends_on_past': False,   'start_date': days_ago(2),
   'email': ['airflow@example.com'],   'email_on_failure': False,   'email_on_retry':
 False,   'retries': 1,   'retry_delay': timedelta(minutes=5), }

The meaning of these arguments is as follows:

  • owner: The owner of the task (often the owner's username in the operating system).
  • depends_on_past: If True, this argument stops a task from occurring if it has not succeeded in the previous attempt.
  • start_date: The date and time at which the task should begin executing.
  • email: The contact email for the task owner.
  • email_on_failure, email_on_retry: These arguments control whether the task owner receives an email notification when the task fails or is retried.
  • retries: The number of times to retry a task after it fails.
  • retry_delay: The time in between retries.

Next, the user creates the DAG object that will store the various tasks in the ETL workflow:

dag = DAG(   'tutorial',   default_args=default_args,   description='A simple tutorial DAG',   schedule_interval=timedelta(days=1), )

The schedule_interval parameter controls the time between executions of the DAG workflow. Here it is set to 1 day, which effectively means that data is loaded into the target data warehouse daily.

Finally, the user defines a few simple tasks and adds them to the DAG:

t1 = BashOperator(   task_id='print_date',   bash_command='date',   dag=dag,
 ) t2 = BashOperator(   task_id='sleep',   depends_on_past=False,
   bash_command='sleep 5',   retries=3,   dag=dag, )

Here, the task t1 executes the Bash command "date" (which prints the current date and time to the command line), while t2 executes the Bash command "sleep 5" (which directs the current program to pause execution for 5 seconds).

Airflow makes it easy to schedule command-line ETL jobs, ensuring that your pipelines consistently and reliably extract, transform, and load the data you need. The good news is that it's easy to integrate Airflow with other ETL tools and platforms like Xplenty, letting you create and schedule automated pipelines for cloud data integration.

Want to learn more about using Airflow? Check out our setup guide ETL with Apache Airflow, or our article Apache Airflow: Explained where we dive deeper into the essential concepts of Airflow.

pandas

pandas is a Python library for data analysis, which makes it an excellent addition to your ETL toolkit. The pandas library includes functionality for reading and writing many different file formats, including:

  • Text files
  • CSV files
  • JSON files
  • XML/HTML files
  • Excel (.xlsb) files
  • HDF5 files
  • Parquet files
  • SQL queries
  • Google BigQuery

The code below shows just how easy it is to import data from a JSON file:

import pandas as pd pd.read_json('test.json')

The basic unit of pandas is the DataFrame, a two-dimensional data structure that stores tabular data in rows and columns. Once data is loaded into the DataFrame, pandas allows you to perform a variety of transformations. For example, the widely-used merge() function in pandas performs a join operation between two DataFrames:

pd.merge(left, right, how='inner', on=None, left_on=None, right_on=None, 
left_index=False, right_index=False, sort=True)

The meaning of these arguments is as follows:

  • left, right: The two DataFrames to be joined.
  • how: The type of join operation ('inner', 'outer', 'left', or 'right').
  • on, left_on, right_on: The columns or index levels to use as join keys, possibly from the left or right DataFrames.
  • left_index, right_index: If True, these arguments use the index (row labels) from the left or right DataFrames as join keys.
  • sort: If True, sorts the resulting DataFrame by its join keys.

pandas includes so much functionality that it's difficult to illustrate with a single-use case. To learn more about using pandas in your ETL workflow, check out the pandas documentation.

Building an ETL Pipeline in Python with Xplenty

The tools discussed above make it much easier to build ETL pipelines in Python. Still, it's likely that you'll have to use multiple tools in combination in order to create a truly efficient, scalable Python ETL solution. What's more, you'll need a skilled, experienced development team who knows Python and systems programming in order to optimize your ETL performance.

Instead of devoting valuable time and effort to building ETL pipelines in Python, more and more organizations are opting for low-code ETL data integration platforms like Xplenty. With more than 100 pre-built integrations and a straightforward drag-and-drop visual interface, Xplenty makes it easier than ever to build simple yet powerful ETL pipelines to your data warehouse.

The good news is that you don't have to choose between Xplenty and Python—you can use them both with the Xplenty Python wrapper, which allows you to access the Xplenty REST API from within a Python program.

Getting started with the Xplenty Python Wrapper is easy. Simply import the xplenty package and provide your account ID and API key:

from xplenty import XplentyClient account_id ="MyAccountID" api_key = 
"V4eyfgNqYcSasXGhzNxS" client = XplentyClient(account_id,api_key)

Next, you need to instantiate a cluster, a group of machines that you have allocated for ETL jobs:

cluster_type = "production" nodes = 2 name ="New Cluster #199999" 
description ="New Cluster's Description" terminate_on_idle = False 
time_to_idle = 3600 cluster = client.create_cluster(cluster_type, nodes, name,
 description, terminate_on_idle, time_to_idle)

The meaning of these arguments is as follows:

  • cluster_type: Either "production" or "sandbox", depending on the use case.
  • nodes: The number of nodes in the cluster (between 2 and the maximum allowed for your account).
  • terminate_on_idle: If True, terminates the cluster when it becomes idle.
  • time_to_idle: The amount of time (in seconds) after which the cluster becomes idle.

Clusters in Xplenty contain jobs. The code below demonstrates how to create and run a new Xplenty job:

cluster_id = 83 package_id = 782 variables = {} 
variables['OUTPUTPATH']="test/job_vars.csv" variables['Date']="09-10-2020"
     job = client.add_job(cluster_id, package_id, variables)

To get started using Xplenty in Python, download the Xplenty Python wrapper and give it a try yourself.

Final Thoughts

Tools like pygrametl, Apache Airflow, and pandas make it easier to build an ETL pipeline in Python. However, they pale in comparison when it comes to low-code, user-friendly data integration solutions like Xplenty. The Xplenty's platform simple, low-code, drag-and-drop interface lets even less technical users create robust, streamlined data integration pipelines. What's more, Xplenty is fully compatible with Python thanks to the Xplenty Python wrapper, and can also integrate with third-party Python ETL tools like Apache Airflow.

Want to give Xplenty a try for yourself? Contact us to schedule a personalized demo and 14-day test pilot so that you can see if Xplenty is the right fit for you.