Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Spark saveAsTable with bucketBy creating thousands of files?

Tags:

Context

Spark 2.0.1, spark-submit in cluster mode. I am reading a parquet file from hdfs:

val spark = SparkSession.builder       .appName("myApp")       .config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")       .config("spark.sql.sources.bucketing.enabled", true)       .enableHiveSupport()       .getOrCreate()  val df = spark.read               .format("parquet")               .load("hdfs://XXX.XX.X.XX/myParquetFile") 

I am saving the df to a hive table with 50 buckets grouped by userid:

df0.write    .bucketBy(50, "userid")    .saveAsTable("myHiveTable") 

Now, when I look into the hive warehouse at my hdfs /user/hive/warehouse there is a folder named myHiveTable. Inside it are a bunch of part-*.parquet files. I would expect there to be 50 files. But no, there are 3201 files!!!! There are 64 files per partition, why? There are different number of files per partitions for different files I saved as hive table. All the files are very small, just tens of Kb each!

Let me add, that number of different userid is about 1 000 000 in myParquetFile.

Question

Why are there 3201 files in the folder instead of 50! What are they?

When I read this table back into DataFrame and print number of partitions:

val df2 = spark.sql("SELECT * FROM myHiveTable")  println(df2.rdd.getNumPartitions) 

The number of partitions isIt is correctly 50 and I confirmed that the data is correctly partitioned by userid.

For one of my large datasets 3Tb I create a table with 1000 partitions which created literally ~million of files! Which exceeds a directory item limit of 1048576 and gives org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException

Question

What does the number of files created depend on?

Question

Is there a way to limit number of files created?

Question

Should I worry about these files? Does it hurt performance on df2 by having all these files? It is always said we should not create too many partitions because it is problematic.

Question

I found this info HIVE Dynamic Partitioning tips that the number of files might be related to number of mappers. It is suggested to use distribute by while inserting to hive table. How could I do it in Spark?

Question

If the problem is indeed as in the link above, here How to control the file numbers of hive table after inserting data on MapR-FS they suggest using options such as hive.merge.mapfiles or hive.merge.mapredfiles to merge all the small files after map reduce job. Are there options for this in Spark?

like image 605
astro_asz Avatar asked Feb 02 '18 15:02

astro_asz


People also ask

What is bucketby in Spark?

Bucket By. The bucket by command allows you to sort the rows of Spark SQL table by a certain column. If you then cache the sorted table, you can make subsequent joins faster.

How do you determine the number of buckets in Spark?

Number of buckets should be same in both the tables. Bucketed column should match in both the tables. Spark Bucketing is not compatible with Hive bucketing and it would introduce the extra sort. Keep an eye on the number of tasks as this would effect the number of files to be created in spark bucketing.

What is partitioning and bucketing in Spark?

Bucketing decomposes data into more manageable or equal parts. With partitioning, there is a possibility that you can create multiple small partitions based on column values. If you go for bucketing, you are restricting number of buckets to store the data. This number is defined during table creation scripts.

How do I use Hive buckets in Spark?

Bucketing is a technique in both Spark and Hive used to optimize the performance of the task. In bucketing buckets (clustering columns) determine data partitioning and prevent data shuffle. Based on the value of one or more bucketing columns, the data is allocated to a predefined number of buckets.


2 Answers

Please use spark sql which will use HiveContext to write data into Hive table, so it will use the number of buckets which you have configured in the table schema.

 SparkSession.builder().   config("hive.exec.dynamic.partition", "true").   config("hive.exec.dynamic.partition.mode", "nonstrict").   config("hive.execution.engine","tez").   config("hive.exec.max.dynamic.partitions","400").   config("hive.exec.max.dynamic.partitions.pernode","400").   config("hive.enforce.bucketing","true").   config("optimize.sort.dynamic.partitionining","true").   config("hive.vectorized.execution.enabled","true").   config("hive.enforce.sorting","true").   enableHiveSupport().getOrCreate()  spark.sql(s"insert into hiveTableName partition (partition_column) select * from  myParquetFile") 

The bucketing implementation of spark is not honoring the specified number of bucket size. Each partitions is writing into a separate files, hence you end up with lot of files for each bucket.

Please refer this link https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil

enter image description here Hope this helps.

Ravi

like image 62
Ravikumar Avatar answered Sep 18 '22 07:09

Ravikumar


I was able to find a workaround (on Spark 2.1). It solves the number of files problem but might have some performance implications.

dataframe   .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))   .repartition(numBuckets, $"bucket")   .write   .format(fmt)   .bucketBy(numBuckets, "bucketColumn")   .sortBy("bucketColumn")   .option("path", "/path/to/your/table")   .saveAsTable("table_name") 

I think spark's bucketing algorithm does a positive mod of MurmurHash3 of the bucket column value. This simply replicates that logic and repartitions the data so that each partition contains all the data for a bucket.

You can do the same with partitioning + bucketing.

dataframe   .withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))   .repartition(numBuckets, $"partitionColumn", $"bucket")   .write   .format(fmt)   .partitionBy("partitionColumn")   .bucketBy(numBuckets, "bucketColumn")   .sortBy("bucketColumn")   .option("path", "/path/to/your/table")   .saveAsTable("table_name") 

Tested with 3 partitions and 5 buckets locally using csv format (both partition and bucket columns are just numbers):

$ tree . . ├── _SUCCESS ├── partitionColumn=0 │   ├── bucket=0 │   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv │   ├── bucket=1 │   │   └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv │   ├── bucket=2 │   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv │   ├── bucket=3 │   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv │   └── bucket=4 │       └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv ├── partitionColumn=1 │   ├── bucket=0 │   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv │   ├── bucket=1 │   │   └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv │   ├── bucket=2 │   │   └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv │   ├── bucket=3 │   │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv │   └── bucket=4 │       └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv └── partitionColumn=2     ├── bucket=0     │   └── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv     ├── bucket=1     │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv     ├── bucket=2     │   └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv     ├── bucket=3     │   └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv     └── bucket=4         └── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv 

Here's the bucket=0 for all 3 partitions (you can see that they are all the same values):

$ paste partitionColumn=0/bucket=0/part-00004-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=1/bucket=0/part-00002-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=2/bucket=0/part-00000-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv | head 0   0   0 4   4   4 6   6   6 16  16  16 18  18  18 20  20  20 26  26  26 27  27  27 29  29  29 32  32  32 

I actually liked the extra bucket index. But if you don't, you can drop the bucket column right before write and you'll get the numBuckets number of files per partition.

like image 29
Bill Kuang Avatar answered Sep 18 '22 07:09

Bill Kuang