Apr 21, 2016

Repartition Spark Dataframes

When using spark dataframes, we may have one common question:

Do we need to repartition the data frame by some field(s) commonly used in later joins or aggregations?

It is really not a simple question for we have to calculate carefully on the communication cost, partition shuffle cost, etc. My experience is that, when joining large data, YOU SHOULD!

HOW?
Repartitioning the spark data frame by some fields is not that straightforward, because the data frame object does not come with .partitionBy() method. However, dataframe is essentially a RDD with structured type mapping, so we can repartition the underlying RDD and create a new data frame out of that. Here is the code in pyspark:

sqlsc = SQLContext(sc)
df_new = sqlsc.createDataFrame(
    df_old.rdd.map(lambda x: (x[0], x)).partitionBy(5).values(),
    df_old.schema)

In the above code, df_new is created via partitioning df_old into 5 partitions by the first field as the key.


I still have to emphasize that weigh the benefit and cost carefully before you do the repartition for repartition cost is also high. Other than joining large data frames, I don't do the repartition in my practice. Another way to speed up the join when data is not that large is to try dataframe.coalesce(numPartitions) function (since v1.4) to reduce the number of partitions. Shuffling the data is typically not involved in the coalesce operation.

No comments:

Post a Comment