Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to ensure partitioning induced by Spark DataFrame join?

I am re-writing a Spark application to use more DataFrame operations for efficiency and robustness. However, there is one part of the application that cannot be done with DataFrames and I have to drop to RDD. Stripped to its essentials, the code would look like this:

C = A.join(B, join_key) # join_key is a string naming a column
D = C.rdd.mapPartitions(do_something)

For correct operation, do_something requires that C.rdd be partitioned by join_key. I think this will be the case, because equijoins work by partitioning the data by the key, then forming pairs whose key values are the same. In Spark RDD joins, the pairs are formed implicitly by an iterator over the partition data, and there would be no way for the pairs to leave the partition in which they were defined unless I told Spark to "materialize" the iterator into a list of pairs and then repartition the result, which I am not doing here. I expect that the same is true of DataFrame joins.

All that said, the discussion above does not prove the desired partitioning is ensured. I'm relying on details of the Spark implementation that are not guaranteed through the API, and I'm not sure that is 100% safe. There's no guarantee the Catalyst optimizer won't toss an extra partition boundary into a group of pairs sharing the same key, breaking it up and making my algorithm incorrect.

To ensure the desired partitioning, I could explicitly do C.rdd.partitionBy(lambda x: x['join_key']) before applying my do_something function, but I'm worried that this could trigger a lot of unnecessary serialization, shuffling, or other overheads.

It appears that I could also use DISTRIBUTE BY from HiveQL, according to this blog post, but again, I do not know what overheads this might trigger.

My question is: is it safe to rely on the implicit partitioning induced by the join, or should I ensure it explicitly? If so, what is the most efficient way to ensure it? I'm working with PySpark 1.6.2.

like image 602
Paul Avatar asked Jul 13 '16 13:07

Paul


1 Answers

Generally speaking specific join mechanism is not a part of the contract and you can you relatively easy construct a synthetic example when assumptions about partitioning fail. For example in certain conditions join can expressed as BroadcastHashJoin which won't trigger repartitioning:

from pyspark.sql.functions import broadcast

# Just so we can easily inspect the results
sqlContext.setConf("spark.sql.shuffle.partitions", 4)

a = (sc
    .parallelize([(1, "a"), (2, "b"), (3, "a"), (4, "b")],  2)
    .toDF(["id", "join_key"]))

# Lets hint optimizer that b can be broadcasted
b = broadcast(
    sc.parallelize([("a", "foo"), ("b", "bar")]).toDF(["join_key", "foobar"])
)

c = a.join(b, "join_key")
c.rdd.glom().collect()

## [[Row(join_key='a', id=1, foobar='foo'),
##  Row(join_key='b', id=2, foobar='bar')],
##  [Row(join_key='a', id=3, foobar='foo'),
##  Row(join_key='b', id=4, foobar='bar')]]

There some other conditions under which broadcast join can be used without explicit hinting (See for example Databricks Guide - SQL, DataFrames & Datasets / BroadcastHashJoin ) and there is no guarantee that some additional mechanisms won't be added in the future.

If you want to be sure about the result you should repartition explicitly.

c.repartition("join_key").rdd.glom().collect()

## [[],
##  [Row(join_key='b', id=2, foobar='bar'),
##  Row(join_key='b', id=4, foobar='bar')],
##  [Row(join_key='a', id=1, foobar='foo'),
##   Row(join_key='a', id=3, foobar='foo')],
##  []]

Another problem here is using DataFrames for efficiency and robustness. If your logic depends heavily on accessing data directly in Python (in contrast to SQL expressions) using DataFrames and passing data around is pretty much an anti-pattern. You can check my answer to Spark functions vs UDF performance? which covers a similar issue. So be sure to benchmark before you commit to this approach because in many cases the cost of moving data around can easily consume all the benefits from SQL optimizations.

like image 167
zero323 Avatar answered Oct 03 '22 23:10

zero323