I used structured streaming to load messages from kafka, do some aggreation then write to parquet file. The problem is that there are so many parquet files created (800 files) for only 100 messages from kafka.
The aggregation part is:
return model
.withColumn("timeStamp", col("timeStamp").cast("timestamp"))
.withWatermark("timeStamp", "30 seconds")
.groupBy(window(col("timeStamp"), "5 minutes"))
.agg(
count("*").alias("total"));
The query:
StreamingQuery query = result //.orderBy("window")
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("checkpointLocation", "c:\\bigdata\\checkpoints")
.start("c:\\bigdata\\parquet");
When loading one of the parquet file using spark, it shows empty
+------+-----+
|window|total|
+------+-----+
+------+-----+
How can I save the dataset to only one parquet file? Thanks
My idea was to use Spark Structured Streaming to consume events from Azure Even Hub then store them on storage in a parquet format.
I finally figured out how to deal with many small files created. Spark version 2.4.0.
This how my query looks like
dfInput
.repartition(1, col('column_name'))
.select("*")
.writeStream
.format("parquet")
.option("path", "adl://storage_name.azuredatalakestore.net/streaming")
.option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
.trigger(processingTime='480 seconds')
.start()
As a result, I have one file created on a storage location every 480 seconds.
To figure out the balance between file size and number of files to avoid OOM error, just play with two parameters: number of partitions and processingTime
, which means the batch interval.
I hope you can adjust the solution to your use case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With