Table of Contents
  • All Pig joins are equi-joins, meaning that only equality comparisons (equals/not equals) may be used for the join predicate.
  • Pig doesn’t support theta (non-equi) joins and they are difficult to implement in MapReduce. If you need a theta join, run cross-join and then use a filter.
  • The join operator functions according to the SQL standard when it comes to nulls - all rows with a null key are dropped on inner joins. Hence, filter out null keys before inner joins to improve performance. Null values are kept for outer joins, though they will not match any record from the other input.
  • Outer joins are only supported when the schema is available for data on the sides where nulls may need to be filled (e.g., for left outer joins the schema on the right should be defined).

During data analysis in SQL, one will often need to look at multiple tables and join them to get the desired results. Joining tables for analyzing data is a required skill for a data scientist or data engineer. Selecting the right join will optimize query performance.

  1. Default Join
  2. Replicated Join
  3. Skewed Join

Default Join

When to Use

When you need to join two datasets using a simple key. It uses the hash join concept by joining keys from two datasets.

How it Works

Join puts together records from two inputs by certain keys. For example, it can put together records from a CRM and records from an ERP by the email key, thus having all customer info in the same dataset based on a common email address (as opposed to several separate datasets).

Outer joins are also available, in which case records that are unmatched on the other side are also included with null values for the missing fields.

Apache Pig reads all join inputs (two or more) in the map phase and annotates each record's source. It uses the join keys as the shuffle keys so that rows with the same key are grouped together. They are then joined into a single record in the reduce phase. For each key-value, all the records from the leftmost input (or inputs) are cached (there can be more than one record per key-value). The rightmost input is then crossed with the cached records to produce an output record. This means that in a one-to-many join, you should always place the input with more records per key value on the right side to increase join performance.

Limitations

Replicated Join

When to Use

When one of the datasets is small enough that it fits in the memory. 

How It Works

A replicated join copies the small dataset to the distributed cache - space that is available on every cluster machine - and loads it into the memory. Each mapper processes a split of the big dataset and looks for matching records in the smaller one. Since the data is available in the memory and is processed on the map side of MapReduce, this operation works much faster than a default join. Both inner and outer joins are available and the small dataset should be on the join’s right side.

Limitations

It isn’t clear how small the dataset needs to be for using replicated join. According to the Pig documentation, a relation of up to 100 MB can be used when the process has 1 GB of memory. A run-time error will be generated if not enough memory is available for loading the data.

Skewed Join

When to Use

When one of the keys is much more common than others, and the data for it is too large to fit in the memory.

How it Works

Standard joins run in parallel across different reducers by splitting key values across processes. If there is a lot of data for a certain key, the data will not be distributed evenly across the reducers, and one of them will be ‘stuck’ processing the majority of data. Skewed join handles this case. It calculates a histogram to check which key is the most prevalent and then splits its data across different reducers for optimal performance.

Limitations

Skewed join supports both inner and outer join, though only with two inputs. Joins between additional tables should be broken up into further joins. Also, there is a pig.skwedjoin.reduce.memusage Java parameter that specifies the heap fraction available to reducers in order to perform this join. Setting a low value means more reducers will be used, yet the cost of copying the data across them will increase. Pig’s developers claim to have good performance when setting it between 0.1-0.4, but one should experiment to find the ideal value.

Summary

Data integration performance can be improved by using the correct join - replicated join when one of the datasets is really small, skewed join when a certain key is extremely common, merge join for sorted datasets, and merge-sparse join as an expansion of merge when the right side table has few matching keys. Choosing the right join will optimize the query/data pipeline performance

Explore Integrate.io to build more optimized data pipelines for engineering or data science needs. Contact our support team to schedule a demo and get a 14-day risk-free trial.