Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming writing to parquet creates so many files

I used structured streaming to load messages from kafka, do some aggreation then write to parquet file. The problem is that there are so many parquet files created (800 files) for only 100 messages from kafka.

The aggregation part is:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));

The query:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");

When loading one of the parquet file using spark, it shows empty

+------+-----+
|window|total|
+------+-----+
+------+-----+

How can I save the dataset to only one parquet file? Thanks

like image 579
taniGroup Avatar asked Oct 30 '22 11:10

taniGroup


1 Answers

My idea was to use Spark Structured Streaming to consume events from Azure Even Hub then store them on storage in a parquet format.

I finally figured out how to deal with many small files created. Spark version 2.4.0.

This how my query looks like

dfInput
  .repartition(1, col('column_name'))
  .select("*")
  .writeStream
  .format("parquet")
  .option("path", "adl://storage_name.azuredatalakestore.net/streaming")
  .option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
  .trigger(processingTime='480 seconds')
  .start()

As a result, I have one file created on a storage location every 480 seconds. To figure out the balance between file size and number of files to avoid OOM error, just play with two parameters: number of partitions and processingTime, which means the batch interval.

I hope you can adjust the solution to your use case.

like image 95
David Greenshtein Avatar answered Nov 15 '22 10:11

David Greenshtein