I have a spark data-frame having a small amount of fields. Some of the fields are huge binary blobs. The size of the entire row is approx 50 MB.
I am saving the data frame into a parquet format. I am controlling the size of the row-group using parquet.block.size
parameter.
Spark will generate a parquet file, however I will always get at least 100 rows in a row group. This is a problem for me since chunk sizes could become gigabytes which does not work well with my application.
parquet.block.size
works as expected as long as the size is big enough to accomodate more than 100 rows.
I modified InternalParquetRecordWriter.java to be MINIMUM_RECORD_COUNT_FOR_CHECK = 2
, which fixed the issue, however, there is no configuration value I can find that would support tuning this hardcoded constant.
Is there a different/better way to get row-group sizes that are smaller than a 100?
This is a snippet of my code:
from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import StructType, StructField, BinaryType
def fake_row(x):
result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
return Row(result, result)
spark_session = SparkSession \
.builder \
.appName("bbox2d_dataset_extraction") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "4g")
spark_session.master('local[5]')
spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
write.option("compression", "none"). \
mode('overwrite'). \
parquet('/tmp/huge/')
Unfortunately I haven't found a way to do so. I reported this issue to remove the hard coded values and make them configurable. I have a patch for it if you're interested.
While PARQUET-409 is not yet fixed, there are couple of workarounds to make application work with that 100
hard-coded minimum number of records per a row group.
First issue and workaround:
You mentioned your rows are could be as large as 50Mb.
This gives row group size of approximately 5Gb.
At the same time your spark executors are only 4Gb (spark.executor.memory
).
Make it significantly bigger than max rowgroup size.
I recommend for such large spark executor memories of 12-20Gb for spark.executor.memory
. Play with this and see which one works for your datasets.
Most of our production jobs run with spark executor memory in this range.
For this to work for such large rowgroups, you may want to tune down also spark.executor.cores
to 1 to make sure each executor process only takes one such large rowgroup at a time. (at expense of loosing some Spark efficiencies) Perhaps try spark.executor.cores
set to 2 - this may require increasing spark.executor.memory
to 20-31Gb range. (try to stay under 32Gb as jvm switches to non-compressed OOP which may have as big as 50% overhead on memory)
Second issue and workaround: Such large rowchunks of 5Gb are most likely spread across many HDFS blocks as default HDFS blocks are in 128-256Mb range. (I assume you use HDFS for storage of those parquet files as you had "hadoop" tag) Parquet best practice is for a row group to reside completely in one HDFS block:
Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
Here's example how to change HDFS block size (set before you create such parquet files):
sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")
or in Spark Scala:
sc.hadoopConfiguration.set("dfs.block.size", "5g")
I hope this will be fixed at Parquet level sometimes, but these two workarounds should allow you to operate with Parquet wich such large row groups.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With