Sometimes a simple join
operation on 2 small DataFrame
s could take forever. Here is an example:
scala> df1.count res0: Long = 607 scala> df2.count res1: Long = 24 scala> val df3 = df1.join(df2) scala> df3.count
I have df1
and df2
as 2 DataFrame
s defined in earlier steps. Both of them are tiny. Logically a join
operation is n*m
complexity and basically 2 loops. For such 2 small data, the join
should take no more than a couple seconds. However on my real example, it took more than a minute to give me the result. What’s happend?
As noticed, when I run the count
action, each stage had a lot tasks, since the tasks are somehow proportional to the total number of partitions the operation need to handle, let’s take a look of how many partitions on each of the input data:
scala> df1.rdd.partitions.size res4: Int = 3 scala> df2.rdd.partitions.size res5: Int = 65
Not very surprising that although the data are small, the number of partitions is still inherited from the upper stream DataFrame
, so that df2
has 65 partitions. Basically the join
operation will have n*m
(n is the number of partitions of df1, and m is the number of partitions of df2) tasks for each stage. The overhead of so many tasks killed the performance.
To address this, we can use the repartition
method of DataFrame
before running the join
operation.
scala> val df1p1 = df1.repartition(1) scala> val df2p1 = df2.repartition(1) scala> val df3 = df1p1.join(df2p1) scala> df3.count
Now the execution time get back to normal.
Takeaway from this study:
Always consider to repartition the data to a small number before join.