Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Repartition with Apache Spark

The problem: I am trying to repartition a dataset so that all rows that have the same number in a specified column of intergers are in the same partition.

What is working: when I use the 1.6 API (in Java) with RDD I use a hash partitioner and this work as expected. For example if I print the modulo of each value of this column for each row I get the same modulo in a given partition (I read the partition by manually reading the content saved with saveAsHadoopFile).

It is not working as expected with the latest API

But now I am trying to use the 2.0.1 API (in Scala) and the Datasets which have a repartition method that take a number of partitions and a column and save this DataSet as a parquet file. The results are not the same if I look in the partitions the rows are not haspartitioned given this column.

like image 919
Paul Trehiou Avatar asked Dec 29 '25 08:12

Paul Trehiou


1 Answers

To save partitioned Dataset you can use either:

  • DataFrameWriter.partitionBy - available since Spark 1.6

    df.write.partitionBy("someColumn").format(...).save()
    
  • DataFrameWriter.bucketBy - available since Spark 2.0

    df.write.bucketBy("someColumn").format(...).save()
    

Using df.partitionBy("someColumn").write.format(...).save should work as well but Dataset API doesn't use hashcodes. It uses MurmurHash so results will be different than the results from HashParitioner in RDD API and trivial checks (like the one you described) won't work.

val oldHashCode = udf((x: Long) => x.hashCode)

// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599
val nonNegativeMode = udf((x: Int, mod: Int) => {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
})

val df = spark.range(0, 10)

val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3))
val newPart = nonNegativeMode(hash($"id"), lit(3))

df.select($"*", oldPart, newPart).show
+---+---------------+--------------------+
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)|
+---+---------------+--------------------+
|  0|              0|                   1|
|  1|              1|                   2|
|  2|              2|                   2|
|  3|              0|                   0|
|  4|              1|                   2|
|  5|              2|                   2|
|  6|              0|                   0|
|  7|              1|                   0|
|  8|              2|                   2|
|  9|              0|                   2|
+---+---------------+--------------------+

One possible gotcha is that DataFrame writer can merge multiple small files to reduce the cost so data from different partitions can be put in a single file.

like image 84
9b428a28 Avatar answered Dec 30 '25 20:12

9b428a28



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!