Thanks to its user-friendliness and popularity in the field of data science, Python is one of the best programming languages for ETL. Still, coding an ETL pipeline from scratch isn’t for the faint of heart — you’ll need to handle concerns such as database connections, parallelism, job scheduling, and logging yourself. The good news is that Python makes it easier to deal with these issues by offering dozens of ETL tools and packages.

Read more: Top 6 Python ETL Tools for 2021

Table of Contents 

  1. Building an ETL Pipeline
  2. pygrametl
  3. Airflow
  4. pandas
  5. Luigi 
  6. Building an ETL Pipeline in Python with Xplenty
  7. Finals Thoughts 

Building an ETL Pipeline

An ETL pipeline is the set of processes used to move data from a source (or several sources) into a database, such as a data warehouse. There are multiple ways to perform an ETL, including the use of standard SQL, an ETL tool, or any programming language. However, Python dominates the ETL space. 

Leveraging Python requires knowledge of relevant frameworks and libraries, as well as ETL tools. For workflow management, Airflow and Luigi are popular choices, whereas pandas is ideal when your goal is to move and process data. There are also self-contained toolkits, including pygrametl. 

In previous articles in this series, we’ve looked at some of the best Python ETL libraries and frameworks. Below, we’ll discuss how you can put some of these resources into action.

We will cover the following Python ETL tools in detail, including example source code.

pygrametl

pygrametl is an open-source Python ETL framework that includes built-in functionality for many common ETL processes. Within pygrametl, each dimension and fact table is represented as a Python object, allowing users to perform many common ETL operations. pygrametl runs on CPython with PostgreSQL by default but can be modified to run on Jython as well.

import psycopg2 import pygrametl from pygrametl.datasources import
SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable
sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" 
sales_pgconn = psycopg2.connect(sales_string)

The pygrametl beginner’s guide offers an introduction on how to extracting data and loading it into a data warehouse. Below, the pygrametl developers demonstrate how to establish a connection to a database:

import psycopg2 import pygrametl from pygrametl.datasources import 
SQLSource, CSVSource from pygrametl.tables import Dimension, FactTable 
sales_string = "host='10.0.0.12' dbname='sale' user='user' password='pass'" 
sales_pgconn = psycopg2.connect(sales_string)

psycopg2 is a Python module that facilitates connections to PostgreSQL databases. Before connecting to the source, the psycopg2.connect() function must be fed a string containing the database name, username, and password. This function can also be used to connect to the target data warehouse:

dw_string = "host='10.0.0.13' dbname='dw' user='dwuser' 
password='dwpass'" dw_pgconn = psycopg2.connect(dw_string) 
dw_conn_wrapper = pygrametl.ConnectionWrapper(connection=dw_pgconn)

In the example above, the user connects to a database named “sales.” Below is the code for extracting specific attributes from the database:

name_mapping= 'book', 'genre', 'city', 'timestamp', 'sale' sales_source = 
SQLSource(connection=sales_pgconn, \                          query="SELECT * FROM sales", names=name_mapping)

After extracting the data from the source database, we can pass into the transformation stage of ETL. In this example code, the user defines a function to perform a simple transformation. The function takes a row from the database as input, and splits a timestamp string into its three constituent parts (year, month, and day):

def split_timestamp(row):   timestamp = row['timestamp']   timestamp_split = 
timestamp.split('/')   row['year'] = timestamp_split[0]   row['month'] = 
timestamp_split[1]   row['day'] = timestamp_split[2]

As mentioned above, pygrametl treats every dimension and fact table as a separate Python object. Below, the user creates three Dimension objects for the “book" and “time” dimensions, as well as a FactTable object to store these two Dimensions:

book_dimension = Dimension(     name='book',     key='bookid',     attributes=
['book', 'genre']) time_dimension = Dimension(     name='time',     key='timeid', 
    attributes=['day', 'month', 'year']) fact_table = FactTable(     name='facttable',
     keyrefs=['bookid', 'timeid'],     measures=['sale'])

We now iterate through each row of the source sales database, storing the relevant information in each Dimension object. The ensure() function checks to see if the given row already exists within the Dimension, and if not, inserts it.

for row in sales_source:   split_timestamp(row)   row['bookid'] = 
book_dimension.ensure(row)   row['timeid'] = time_dimension.ensure(row) 
  fact_table.insert(row)

Finally, we can commit this data to the data warehouse and close the connection:

dw_conn_wrapper.commit() dw_conn_wrapper.close()

pygrametl provides a powerful ETL toolkit with many pre-built functions, combined with the power and expressiveness of regular Python. To learn more about the full functionality of pygrametl, check out the project's documentation on GitHub.

Airflow

While pygrametl is a full-fledged Python ETL framework, Airflow is designed for one purpose: to execute data pipelines through workflow automation. First developed by Airbnb, Airflow is now an open-source project maintained by the Apache Software Foundation. The basic unit of Airflow is the directed acyclic graph (DAG), which defines the relationships and dependencies between the ETL tasks that you want to run.

Airflow's developers have provided a simple tutorial to demonstrate the tool's functionality. First, the user needs to import the necessary libraries and define the default arguments for each task in the DAG:

from airflow import DAG from airflow.operators.bash_operator import 
BashOperator from airflow.utils.dates import days_ago default_args = {
   'owner': 'airflow',   'depends_on_past': False,   'start_date': days_ago(2),
   'email': ['airflow@example.com'],   'email_on_failure': False,   'email_on_retry':
 False,   'retries': 1,   'retry_delay': timedelta(minutes=5), }

The meaning of these arguments is as follows:

  • owner: The owner of the task (often the owner's username in the operating system).
  • depends_on_past: If True, this argument stops a task from occurring if it has not succeeded in the previous attempt.
  • start_date: The date and time at which the task should begin executing.
  • email: The contact email for the task owner.
  • email_on_failure, email_on_retry: These arguments control whether the task owner receives an email notification when the task fails or is retried.
  • retries: The number of times to retry a task after it fails.
  • retry_delay: The time in between retries.

Next, the user creates the DAG object that will store the various tasks in the ETL workflow:

dag = DAG(   'tutorial',   default_args=default_args,   description='A simple tutorial DAG',   schedule_interval=timedelta(days=1), )

The schedule_interval parameter controls the time between executions of the DAG workflow. Here it is set to 1 day, which effectively means that data is loaded into the target data warehouse daily.

Finally, the user defines a few simple tasks and adds them to the DAG:

t1 = BashOperator(   task_id='print_date',   bash_command='date',   dag=dag,
 ) t2 = BashOperator(   task_id='sleep',   depends_on_past=False,
   bash_command='sleep 5',   retries=3,   dag=dag, )

Here, the task t1 executes the Bash command "date" (which prints the current date and time to the command line), while t2 executes the Bash command "sleep 5" (which directs the current program to pause execution for 5 seconds).

Airflow makes it easy to schedule command-line ETL jobs, ensuring that your pipelines consistently and reliably extract, transform, and load the data you need. The good news is that it's easy to integrate Airflow with other ETL tools and platforms like Xplenty, letting you create and schedule automated pipelines for cloud data integration.

Using Airflow makes the most sense when you are performing long ETL jobs, or when multiple steps are involved. You can restart from any point within the ETL process. However, Airflow is not a library. Since you have to deploy it, Airflow is not an optimal choice for small ETL jobs. 

Want to learn more about using Airflow? Check out our setup guide ETL with Apache Airflow, or our article Apache Airflow: Explained where we dive deeper into the essential concepts of Airflow.

pandas

pandas is a Python library for data analysis, which makes it an excellent addition to your ETL toolkit. The pandas library includes functionality for reading and writing many different file formats, including:

  • Text files
  • CSV files
  • JSON files
  • XML/HTML files
  • Excel (.xlsb) files
  • HDF5 files
  • Parquet files
  • SQL queries
  • Google BigQuery

The code below shows just how easy it is to import data from a JSON file:

import pandas as pd pd.read_json('test.json')

The basic unit of pandas is the DataFrame, a two-dimensional data structure that stores tabular data in rows and columns. Once data is loaded into the DataFrame, pandas allows you to perform a variety of transformations. For example, the widely-used merge() function in pandas performs a join operation between two DataFrames:

pd.merge(left, right, how='inner', on=None, left_on=None, right_on=None, 
left_index=False, right_index=False, sort=True)

The meaning of these arguments is as follows:

  • left, right: The two DataFrames to be joined.
  • how: The type of join operation ('inner', 'outer', 'left', or 'right').
  • on, left_on, right_on: The columns or index levels to use as join keys, possibly from the left or right DataFrames.
  • left_index, right_index: If True, these arguments use the index (row labels) from the left or right DataFrames as join keys.
  • sort: If True, sorts the resulting DataFrame by its join keys.

pandas includes so much functionality that it's difficult to illustrate with a single-use case. pandas makes the most sense when you need to extract data, clean and transform it, and then write it to a CSV file, Excel, or an SQL database.

To learn more about using pandas in your ETL workflow, check out the pandas documentation.

Luigi

Luigi is an open-source tool that allows you to build complex pipelines. Spotify developed Luigi to automate the immense amount of data generated every day. Although Luigi has many applications, it was tailor-made for Spotify, which means it may not be well-suited for your unique needs. However, some companies have adopted it over the years, including Deliveroo. 

Luigi handles:

  • Workflow management
  • Visualization
  • Dependency resolution 
  • Command line integration

When using Luigi, there are “tasks” and “targets” — which are the nodes and edges. Tasks consume targets, creating a chain reaction. A task will plop out a target, which will then be eaten by another task, that will plop out another target. 

To leverage Luigi, you will need to familiarize yourself with tasks, as they are the basic building blocks. To create a task, you need to create a class, which will contain one or all of the following:

  • run()
  • requires()
  • output()
  • targets 

Unlike many pipeline systems, the process of transferring information to the next node is reversed. The program begins with the last task, checking if it can be executed or if anything else is required. 

This option is best for simple ETL processes, such as logs. Since Luigi’s structure is rather strict, it limits more complex tasks. If you are building an enterprise solution, Luigi may be a good choice. 

See how Luigi stacks up against Airflow in terms of usability, scalability, popularity, scheduling, and reviews. 

Building an ETL Pipeline in Python with Xplenty

The tools discussed above make it much easier to build ETL pipelines in Python. Still, it's likely that you'll have to use multiple tools in combination in order to create a truly efficient, scalable Python ETL solution. What's more, you'll need a skilled, experienced development team who knows Python and systems programming in order to optimize your ETL performance.

Instead of devoting valuable time and effort to building ETL pipelines in Python, more and more organizations are opting for low-code ETL data integration platforms like Xplenty. With more than 100 pre-built integrations and a straightforward drag-and-drop visual interface, Xplenty makes it easier than ever to build simple yet powerful ETL pipelines to your data warehouse.

The good news is that you don't have to choose between Xplenty and Python—you can use them both with the Xplenty Python wrapper, which allows you to access the Xplenty REST API from within a Python program.

Getting started with the Xplenty Python Wrapper is easy. Simply import the xplenty package and provide your account ID and API key:

from xplenty import XplentyClient account_id ="MyAccountID" api_key = 
"V4eyfgNqYcSasXGhzNxS" client = XplentyClient(account_id,api_key)

Next, you need to instantiate a cluster, a group of machines that you have allocated for ETL jobs:

cluster_type = "production" nodes = 2 name ="New Cluster #199999" 
description ="New Cluster's Description" terminate_on_idle = False 
time_to_idle = 3600 cluster = client.create_cluster(cluster_type, nodes, name,
 description, terminate_on_idle, time_to_idle)

The meaning of these arguments is as follows:

  • cluster_type: Either "production" or "sandbox", depending on the use case.
  • nodes: The number of nodes in the cluster (between 2 and the maximum allowed for your account).
  • terminate_on_idle: If True, terminates the cluster when it becomes idle.
  • time_to_idle: The amount of time (in seconds) after which the cluster becomes idle.

Clusters in Xplenty contain jobs. The code below demonstrates how to create and run a new Xplenty job:

cluster_id = 83 package_id = 782 variables = {} 
variables['OUTPUTPATH']="test/job_vars.csv" variables['Date']="09-10-2020"
     job = client.add_job(cluster_id, package_id, variables)

To get started using Xplenty in Python, download the Xplenty Python wrapper and give it a try yourself.

Bottom line: Xplenty is an ETL platform that does not require any coding or deployment. Helping thousands of users organize their data, Xplenty makes it easy to maximize your data, preparing it for insights. 

Integrate Your Data Today!

Try Xplenty free for 14 days. No credit card required.

Octopus

Final Thoughts

Tools like pygrametl, Apache Airflow, Luigi, and pandas make it easier to build an ETL pipeline in Python. However, they pale in comparison when it comes to low-code, user-friendly data integration solutions like Xplenty. The Xplenty platform's simple, low-code, drag-and-drop interface lets even less technical users create robust, streamlined data integration pipelines. What's more, Xplenty is fully compatible with Python thanks to the Xplenty Python wrapper, and can also integrate with third-party Python ETL tools like Apache Airflow.

Want to give Xplenty a try for yourself? We welcome you to schedule a demo today!