July 09, 2025
By Federico Zambelli
Spark,
Delta Lake,
Data Engineering
This article was originally published on hashnode.dev on Jul 2, 2025.
Last week I was working on a Spark pipeline that was running slowly, and I discovered that a specific task with significant skew was the cause.
Googling for the problem didn’t return any meaningful result, so I had to figure it out myself.
Here’s what I found and how I fixed it.
The pipeline was an SCD2 incremental build, something that every data engineer has to deal with sooner or later.
Frameworks like Delta allow the use of this strategy inside a data lake, thanks to a simple API that grants update and merge capabilities over otherwise immutable data.
Merging a sequence of data changes to a base table is relatively straightforward:
INSERT
, UPDATE
, DELETE
) for each record in the sequenceThe second step is crucial because UPDATE
s are treated specially. An UPDATE
record must be used twice during the merge:
is_current = false
and an end date).Delta’s merge
operation begins with setting a merge condition, usually based on a primary key (e.g. base.primary_key = updates.primary_key
). Then, it has two main actions:
whenMatchedUpdate
: for records that meet the merge conditionwhenNotMatchedInsert
: for records that don’t meet the merge conditionWe need to make sure that records of type UPDATE
fall under both of them. In other words, the record must both match and not match the merge condition at once.
To obtain this result, for these records we create duplicates that can never satisfy the whenMatched
condition. Usually, this is done by setting merge_key = null
, and then we append these records to the rest of the data we’re merging. This is how it’s recommended in Delta’s official documentation.
By setting merge_key = null
, we’re artificially creating skew: since a Delta merge is effectively a JOIN
, Spark will shuffle data based on the join/merge condition (unless broadcasted), and will send all records with merge_key = null
to a single partition, using only a single core to perform an operation that could otherwise be parallelized.
The above screenshot is taken from the build I was working on: the long green bar represents the task where all the records with the null
merge key ended up being, with the length representing its duration.
It took way longer to execute, and it processed an amount of records an order of magnitude larger than any other task in that job.
Fortunately, the fix is simple: we need to set the merge_key
for those records to something that isn’t null
, can never match any of the primary keys, and is evenly distributed. If the primary key is a string, we can set the merge_key
like this:
df_inserts = df.withColumn(
"merge_key", F.concat(F.lit("__INSERT__"), F.col("id"))
)
This keeps the original distribution of data based on the primary key, while also ensuring that the merge_key
can never match the original pk.
However, if the primary key is a number, we can’t use the method above. That’s because during merge, the engine casts the merge_key
to an integer
, which means it will go back to being null
, since that’s the result of casting an alphanumeric string to a number.
In this case, we need to set our merge_key
to be a number, always making sure to never match the original pk. This can be done by setting it in the negatives (assuming the pk is always a positive integer). Here’s the example code:
df_inserts = df.withColumn(
"merge_key", -100_000_000 - F.col("id")
)
Like the above case, this keeps the same distribution of data, and also satisfies all the other requirements.
Applying the fix to the pipeline removed the skew entirely, as the data could now be properly distributed across all executors. The total run time was cut in half.
In the end, this is not different than salting join keys in a normal Spark pipeline. The source of confusion comes from using Delta API, which abstracts Spark joins and makes it a little less obvious to identify where the problem hides.
Ready to discuss your software engineering needs with our team of experts?