Improving Pig Data Integration Performance with Join

Improving Pig Data Integration Performance with Join

Did you know that choosing the right join type could improve data integration performance? Pig has several different algorithms that can process certain datasets in the most optimal way, thus saving you (X)plenty of time. They will be reviewed in this post.

Replicated Join

When to use

One of the datasets is small enough that it fits in the memory.

How it works

A replicated join copies the small dataset to the distributed cache - space that is available on every cluster machine - and loads it into the memory. Each mapper processes a split of the big dataset and looks for matching records in the smaller one. Since the data is available in the memory, and is processed on the map side of MapReduce, this operation works much faster than a default join. Both inner and outer joins are available and the small dataset should be on join’s right side.

Limitations

It isn’t clear how small the dataset needs to be for using replicated join. According to the Pig documentation, a relation of up to 100 MB can be used when the process has 1 GB of memory. A run-time error will be generated if not enough memory is available for loading the data.

Skewed Join

When to use

One of the keys is much more common than others, and the data for it is too large to fit in the memory.

How it works

Standard joins run in parallel across different reducers by splitting key values across processes. If there is a lot of data for a certain key, the data will not be distributed evenly across the reducers, and one of them will be ‘stuck’ processing the majority of data. Skewed join handles this case. It calculates a histogram to check which key is the most prevalent and then splits its data across different reducers for optimal performance.

Limitations

Skewed join supports both inner and outer join, though only with two inputs - joins between additional tables should be broken up into further joins. Also, there is a pig.skwedjoin.reduce.memusage Java parameter that specifies the heap fraction available to reducers in order to perform this join. Setting a low value means more reducers will be used, yet the cost of copying the data across them will increase. Pig’s developers claim to have good performance when setting it between 0.1-0.4, but one should experiment to find the ideal value.

Merge Join

When to use

The two datasets are both sorted in ascending order by the join key.

How it works

Datasets may already be sorted by the join key if that’s the order in which data was entered or they have undergone sorting before the join operation for other needs. When merge join receives the pre-sorted datasets, they are read and compared on the map side, and as a result they run faster. Both inner and outer join are available.

Limitations

There are various technical limitations for inner and outer joins - please see the Pig documentation for details.

Merge-Sparse Join

When to use

Both tables are sorted by the join key in ascending order, and the right side table has less than 1% matching keys.

How it works

Similar to merge join.

Limitations

Merge-sparse join is only available for inner joins. It shares all but one of the same conditions as merge join - the join loader must implement IndexedLoadFunc. The standard Pig distribution includes a load function that implements it called org.apache.pig.piggybank.storage.IndexedStorage. Its performance will be better when the data in the left table is partitioned evenly across the files. Replicated join should be used if one of the datasets is small enough to fit in the memory.

Default Join

When to use

When you have data that does not fit into any of the cases above.

How it works

Join puts together records from two inputs by certain keys. For example, it can put together records from a CRM and records from an ERP by the email key, thus having all customer info in the same dataset instead of several separate datasets. Outer joins are also available, in which case records that are unmatched on the other side are also included with null values for the missing fields.

Pig reads all join inputs (two or more) in the map phase and annotates each record's source. It uses the join keys as the shuffle keys so that rows with the same key are grouped together. They are then joined into a single record in the reduce phase - for each key value, all the records from the leftmost input (or inputs) are cached (there can be more than one record per key value). The rightmost input is then crossed with the cached records to produce an output record. This means that in a one-to-many join, you should always place the input with more records per key value on the right side to increase join performance.

Limitations

All Pig joins are equi-joins, meaning that only equality comparisons (equals/not equals) may be used for the join predicate. Pig doesn’t support theta (non-equi) joins and they are difficult to implement in MapReduce. If you need a theta join, run cross-join and then use a filter.

The join operator functions according to the SQL standard when it comes to nulls - all rows with a null key are dropped on inner joins. Hence, filter out null keys before inner joins to improve performance. Null values are kept for outer joins, though they will not match any record from the other input.

Outer joins are only supported when the schema is available for data on the sides where nulls may need to be filled - e.g., for left outer joins the schema on the right should be defined.

General

Lookups - operations that only return the first match for a certain key - are not supported by Pig. If the data is small enough, replicated join should do the trick, as long as you make sure that the join key is unique in the source data, or use a group by statement to make it unique. Otherwise, use HBase as a storage engine and write Pig UDFs (user defined functions) that perform the lookups on HBase.

Summary

Data integration performance can be improved by using the correct join - replicated join when one of the datasets is really small, skewed join when a certain key is extremely common, merge join for sorted datasets, and merge-sparse join as an expansion of merge when the right side table has few matching keys.


Integrate Your Data Today!

Try Xplenty free for 7 days. No credit card required.