Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark not ignoring empty partitions

I am trying to read a subset of a dataset by using pushdown predicate. My input dataset consists in 1,2TB and 43436 parquet files stored on s3. With the push down predicate I am supposed to read 1/4 of data.

Seeing the Spark UI. I see that the job actually reads 1/4 of data (300GB) but there are still 43436 partitions in the first stage of the job however only 1/4 of these partitions has data, the other 3/4 are empty ones (check the median input data in the attached screenshots).

I was expecting Spark to create partitions only for non empty partitions. I am seeing a 20% performance overhead when reading the whole dataset with the pushdown predicate comparing to reading the prefiltred dataset by another job (1/4 of data) directly. I suspect that this overhead is due to the huge number of empty partitions/tasks I have in my first stage, so I have two questions:

  1. Are there any workaround to avoid these empty partitions?
  2. Do you think to any other reason responsible for the overhead? may be the pushdown filter execution is naturally a little bit slow?

Thank you in advance

spark ui data read

enter image description here

like image 427
Wassim Maaoui Avatar asked Jun 25 '20 17:06

Wassim Maaoui


People also ask

How do I remove a blank partition in Spark?

There isn't an easy way to simply delete the empty partitions from a RDD. coalesce doesn't guarantee that the empty partitions will be deleted. If you have a RDD with 40 blank partitions and 10 partitions with data, there will still be empty partitions after rdd. coalesce(45) .

How do I reduce the number of partitions in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

Can we decide no of partitions created in Spark?

The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all.

What is the use of empty RDD in Spark?

Using Spark sc. parallelize() we can create an empty RDD with partitions, writing partitioned RDD to a file results in the creation of multiple part files.


3 Answers

Using S3 Select, you can retrieve only a subset of data.

With Amazon EMR release version 5.17.0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.

Otherwise, S3 acts as an object store, in which case, an entire object has to be read. In your case you have to read all content from all files, and filter them on client side.

There is actually very similar question, where by testing you can see that:

The input size was always the same as the Spark job that processed all of the data

You can also see this question about optimizing data read from s3 of parquet files.

like image 50
Yosi Dahari Avatar answered Oct 18 '22 06:10

Yosi Dahari


Seems like your files are rather small: 1.2TB / 43436 ≈ 30MB. So you may want to look at increasing the spark.sql.files.maxPartitionBytes, to see if it reduces the total number of partitions. I have not much experience with S3, so not sure whether its going to help given this note in its description:

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

like image 41
mazaneicha Avatar answered Oct 18 '22 06:10

mazaneicha


Empty partitions: It seems that spark (2.4.5) tries to really have partitions with size ≈ spark.sql.files.maxPartitionBytes (default 128MB) by packing many files into one partition, source code here. However it does this work before running the job, so it can't know that 3/4 of files will not output data after the pushed down predicate being applied. For the partitions where it will put only files whose lines will be filtered out, I ended up with empty partitions. This explains also why my max partition size is 44MB and not 128MB, because none of the partitions had by chance files that passed all the pushdown filter.

20% Overhead: Finally this is not due to empty partitions, I managed to have much less empty partitions by setting spark.sql.files.maxPartitionBytes to 1gb but it didn't improve reading. I think that the overhead is due to opening many files and reading their metadata. Spark estimates that opening a file is equivalent to reading 4MB spark.sql.files.openCostInBytes. So opening many files even if thanks to the filter won't be read shouldn't be negligible..

like image 1
Wassim Maaoui Avatar answered Oct 18 '22 07:10

Wassim Maaoui