Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining a large and a ginormous spark dataframe

I have two dataframes, df1 has 6 million rows, df2 has 1 billion.

I have tried the standard df1.join(df2,df1("id")<=>df2("id2")), but run out of memory.

df1 is too large to be put into a broadcast join.

I have even tried a bloom filter, but it was also too large to fit in a broadcast and still be useful.

The only thing I have tried that doesn't error out is to break df1 into 300,000 row chunks and join with df2 in a foreach loop. But this takes an order of magnitude longer than it probably should (likely because it is too large to fit as a persist causing it to redo the split upto that point). Recombining the results also takes awhile.

How have you solved this issue?

A few notes:

df1 is a subset of df2. df1=df2.where("fin<1").selectExpr("id as id2").distinct() I am interested in all rows in df2 that have an id that at one time have a fin<1, which means I can't do it as one step.

there are about 200 million unique ids in df2.

here are some relevant spark settings:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

The error I get is :

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)

and

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory
like image 639
MatthewH Avatar asked Mar 11 '16 19:03

MatthewH


People also ask

What is the threshold size limit for broadcast join in Spark?

Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql. autoBroadcastJoinThreshold which is by default 10MB.


1 Answers

As I see it you have a problem of too large partitions(probably due to bigger data) You can try few approaches:

  1. try to define spark.sql.shuffle.partitions to be something 2048 or even more(default is 200). There will be shuffle while joining your df-s. Try to play with this parameter, so that total volume of bigger data / this param will be approx 64Mb-100Mb(depends on file format). In general you should see in spark UI that each task(per partition) process "normal" amount of data(64MB-100MB max)

  2. If first is not working I can suggest to do this join in RDD api. Convert your df into RDD. Then partition both RDDs by HashPartitioner(number of partitions). When number of partitions should be computed as I've described before.

  3. lately new option was added by spark devs: you can bucket ginormous table into N buckets(i.e. store it ready for the join). There are few limitation present, but it can eliminate shuffling ginormous data completely. bucketBy is supported only with saveAsTable api and not save one. After you've bucketBy data and it's bucketed, then on next iteration you can load this data as external table while providing bucketing spec(see https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html)

    CREATE TABLE ginormous --...here you must specify schema USING PARQUET CLUSTERED BY (a,b,c) INTO N buckets LOCATION 'hdfs://your-path'

Then, when you've loaded ginormous table as bucketed one, you can load big table and repartition it to the same number of buckets and by same columns(df.repartition(N, a,b,c))

like image 93
Igor Berman Avatar answered Sep 19 '22 20:09

Igor Berman