A similar question has been asked here, but it does not address my question properly. I am having nearly 100 DataFrames, with each having atleast 200,000
rows and I need to join them, by doing a full
join based on the column ID
, thereby creating a DataFrame with columns - ID, Col1, Col2,Col3,Col4, Col5..., Col102
.
Just for illustration, the structure of my DataFrames -
df1 = df2 = df3 = ..... df100 =
+----+------+------+------+ +----+------+ +----+------+ +----+------+
| ID| Col1| Col2| Col3| | ID| Col4| | ID| Col5| | ID|Col102|
+----+------+-------------+ +----+------+ +----+------+ +----+------+
| 501| 25.1| 34.9| 436.9| | 501| 22.33| | 503| 22.33| | 501| 78,1|
| 502| 12.2|3225.9| 46.2| | 502| 645.1| | 505| 645.1| | 502| 54.9|
| 504| 754.5| 131.0| 667.3| | 504| 547.2| | 504| 547.2| | 507| 0|
| 505|324.12| 48.93| -1.3| | 506| 2| | 506| 2| | 509| 71.57|
| 506| 27.51| 88.99| 67.7| | 507| 463.7| | 507| 463.7| | 510| 82.1|
.
.
+----+------+------|------| |----|------| |----|------| |----|------|
I starting joining these DataFrames by doing a full
join sequentially on all of them. Naturally, this is computationally intensive procedure and one must strive to reduce the number of shuffles
across different worker nodes. Therefore, I started by partitioning the DataFrame df1
based on ID
using repartition(), which hash-partitions
the DataFrame based on ID
into 30 partitions -
df1 = df1.repartition(30,'ID')
Now, I do a full
join between df1
and df2
.
df = df1.join(df2,['ID'],how='full')
df.persist()
Since df1
was already hash-partitioned
, so I had expected that this join
above would skip shuffles and would maintain the partitioner
of df1
, but I notice that a shuffle
did take place and it increased the number of partitions on df
to 200
. Now, if I keep on joining the subsequent DataFrames by calling them via a function like shown below, I get the error java.io.IOException: No space left on device
-
def rev(df,num):
df_temp = spark.read.load(filename+str(num)+'.csv')
df_temp.persist()
df = df.join(df_temp,['ID'],how='full')
df_temp.unpersist()
return df
df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() -
print("Total number of rows: "+str(df.count()))
df.unpersist() # Never reached this stage.
Update: Error message -
Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
Questions: 1. Why was the partitioner of df1
not maintained when we did the first join
?
2.How can I join these multiple tables efficiently and also avoid this No space left on device
issue? User @silvio here suggests to use .bucketBy(), but he also alluded to the fact the partitioner will be maintained, which did not happen. So, I am not sure as to what would be an efficient way to join these multiple DataFrames.
Any suggestion/hints will be very appreciated.
1st try to persist your big df every N iterations with a for loop (that you probably have already)
2nd try to control the default partition number by setting sqlContext.sql("set spark.sql.shuffle.partitions=100")
instead of 200 that is the default.
Your code should look like:
num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
big_df = big_df.join(df, ....)
if i % num_partitions == 0:
big_df = big_df.persist()
Here I call persist every 10 iterations you can of course adjust that number according to the behavior of your job.
EDIT: In your case you are persisting the local df_temp inside the rev function but not the whole dataframe which contains all the previous joins (df in your case). This will have no effect in the final execution plan since it is a local persist. As for my suggestion let's assume that you need in total 100 joins then with the code above you should iterate through a loop [1..100] and persist the accumulated results every 10 iterations. After persisting the big dataframe the DAG will contain less in-memory calculations since the intermediate steps will be stored and Spark knows how to restore them from the storage instead of recalculating everything from scratch.
I've had similar problem in past except didn't have that many RDDs. The most efficient solution I could find was to use the low level RDD API. First store all the RDDs so that they are (hash) partitioned and sorted within partitions by the join column(s): https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-
After this the join can be implemented using zip partitions without shuffling or using much memory: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions-org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With