Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating parquet files in spark with row-group size that is less than 100

I have a spark data-frame having a small amount of fields. Some of the fields are huge binary blobs. The size of the entire row is approx 50 MB.

I am saving the data frame into a parquet format. I am controlling the size of the row-group using parquet.block.size parameter.

Spark will generate a parquet file, however I will always get at least 100 rows in a row group. This is a problem for me since chunk sizes could become gigabytes which does not work well with my application.

parquet.block.size works as expected as long as the size is big enough to accomodate more than 100 rows.

I modified InternalParquetRecordWriter.java to be MINIMUM_RECORD_COUNT_FOR_CHECK = 2, which fixed the issue, however, there is no configuration value I can find that would support tuning this hardcoded constant.

Is there a different/better way to get row-group sizes that are smaller than a 100?

This is a snippet of my code:

from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np

from pyspark.sql.types import StructType, StructField, BinaryType


def fake_row(x):
    result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
    return Row(result, result)

spark_session = SparkSession \
    .builder \
    .appName("bbox2d_dataset_extraction") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "4g")

spark_session.master('local[5]')

spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)

index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])

bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
    write.option("compression", "none"). \
    mode('overwrite'). \
    parquet('/tmp/huge/')
like image 794
Yevgeni Litvin Avatar asked Jan 09 '18 22:01

Yevgeni Litvin


2 Answers

Unfortunately I haven't found a way to do so. I reported this issue to remove the hard coded values and make them configurable. I have a patch for it if you're interested.

like image 107
Pradeep Gollakota Avatar answered Sep 17 '22 07:09

Pradeep Gollakota


While PARQUET-409 is not yet fixed, there are couple of workarounds to make application work with that 100 hard-coded minimum number of records per a row group.

First issue and workaround: You mentioned your rows are could be as large as 50Mb. This gives row group size of approximately 5Gb. At the same time your spark executors are only 4Gb (spark.executor.memory). Make it significantly bigger than max rowgroup size.
I recommend for such large spark executor memories of 12-20Gb for spark.executor.memory. Play with this and see which one works for your datasets. Most of our production jobs run with spark executor memory in this range. For this to work for such large rowgroups, you may want to tune down also spark.executor.cores to 1 to make sure each executor process only takes one such large rowgroup at a time. (at expense of loosing some Spark efficiencies) Perhaps try spark.executor.cores set to 2 - this may require increasing spark.executor.memory to 20-31Gb range. (try to stay under 32Gb as jvm switches to non-compressed OOP which may have as big as 50% overhead on memory)

Second issue and workaround: Such large rowchunks of 5Gb are most likely spread across many HDFS blocks as default HDFS blocks are in 128-256Mb range. (I assume you use HDFS for storage of those parquet files as you had "hadoop" tag) Parquet best practice is for a row group to reside completely in one HDFS block:

Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB - 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.

Here's example how to change HDFS block size (set before you create such parquet files):

sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")

or in Spark Scala:

sc.hadoopConfiguration.set("dfs.block.size", "5g")

I hope this will be fixed at Parquet level sometimes, but these two workarounds should allow you to operate with Parquet wich such large row groups.

like image 23
Tagar Avatar answered Sep 18 '22 07:09

Tagar