Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How we save a Huge pyspark dataframe?

I have a big pyspark Dataframe which I want to save in myfile (.tsv) for further use. To do that, I defined the following code:

with open(myfile, "a") as csv_file:
        writer = csv.writer(csv_file, delimiter='\t')
        writer.writerow(["vertex" + "\t" + "id_source" + "\t" + "id_target" + "\t"+ "similarity"])
        
        for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
            part_rdd = joinDesrdd_df.rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
            data_from_part_rdd = part_rdd.collect()
            vertex_list = set()
           
            for row in data_from_part_rdd:
                writer.writerow([....])
            
        csv_file.flush() 

My code cannot pass this step, It generates an exception:

1.In the workers log:

19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/2 finished with state KILLED exitStatus 143
14: 19/07/22 08:58:57 INFO ExternalShuffleBlockResolver: Application app-20190722085320-0000 removed, cleanupLocalDirs = true
14: 19/07/22 08:58:57 INFO Worker: Cleaning up local directories for application app-20190722085320-0000
 5: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/1 finished with state KILLED exitStatus 143
 7: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/14 finished with state KILLED exitStatus 143
...

2.in the Job execution log:

Traceback (most recent call last):
  File "/project/6008168/tamouze/RWLastVersion2207/module1.py", line 306, in <module>
    for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 88, in rdd
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o528.javaToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
+- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])

I don't know why this piece of my code generates the exception. Note that on small data, the execution is ok, but with big data it's not.

Also, please what is the best way to save a pysaprk dataframe for further use?

Update: I tried to replace the above with loop by the following:

joinDesrdd_df.withColumn("par_id",col('id_source')%50).repartition(50, 'par_id').write.format('parquet').partitionBy("par_id").save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

I get similar exception:

19/07/22 21:10:18 INFO TaskSetManager: Finished task 653.0 in stage 11.0 (TID 2257) in 216940 ms on 172.16.140.237 (executor 14) (1017/1024)
19/07/22 21:11:32 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(par_id#328, 50)
+- *(12) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258, par_id#328])
   +- Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
      +- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])
         +- *(11) Project [id_source#263, id_target#292, similarity#258]
            +- *(11) BroadcastHashJoin [instance_target#65], [instance#291], Inner, BuildRight
like image 836
bib Avatar asked Jul 23 '19 02:07

bib


1 Answers

I'd suggest to use the Spark native write functionality:

joinDesrdd_df.write.format('csv').option("header", "true").save("path/to/the/output/csv/folder")

Spark will save each partition of the dataframe as a separate csv file into the path specified. You can control the number of files by the repartition method, which will give you a level of control of how much data each file will contain.

I'd also like to suggest to use ORC or Parquet data format for big datasets, since they are definitely more suited for storage of big datasets.

For example for parquet:

joinDesrdd_df.withColumn("par_id",col('id_source')%50). \
 repartition(50, 'par_id').write.format('parquet'). \
 save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

To read it back into dataframe:

df = spark.read. \
 parquet("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")
like image 102
Richard Nemeth Avatar answered Oct 24 '22 22:10

Richard Nemeth