Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lazy loading of partitioned parquet in Apache Spark

Tags:

apache-spark

As I understand it, Apache Spark uses lazy evaluation. So for example code like the following that consists only of transformations will do no actual processing:

val transformed_df = df.filter("some_field = 10").select("some_other_field", "yet_another_field")

Only when we do an "action" will any processing actually occur:

transformed_df.show()

I had been under the impression that load operations are also lazy in spark. (See How spark loads the data into memory.)

However, my experiences with spark have not borne this out. When I do something like the following,

val df = spark.read.parquet("/path/to/parquet/")

execution seems to depend greatly on the size of the data in the path. In other words, it's not strictly lazy. This is inconvenient if the data is partitioned and I only need to look at a fraction of the partitions.

For example:

df.filter("partitioned_field = 10").show()

If the data is partitioned in storage on "partitioned_field", I would have expected spark to wait until show() is called, and then read only data under "/path/to/parquet/partitioned_field=10/". But again, this doesn't seem to be the case. Spark appears to perform at least some operations on all of the data as soon as read or load is called.

I could get around this by only loading /path/to/parquet/partitioned_field=10/ in the first place, but this is much less elegant than just calling "read" and filtering on the partitioned field, and it's harder to generalize.

Is there a more elegant preferred way to lazily load partitions of parquet data?

(To clarify, I am using Spark 2.4.3)

like image 1000
croncroncron Avatar asked Oct 30 '19 14:10

croncroncron


People also ask

Can Parquet files be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

How many Apache spark workers can work on a partition at a time?

Spark can run 1 concurrent task for every partition of an RDD (up to the number of cores in the cluster).

How many tasks can Spark run on an RDD's partition?

Apache Spark can only run a single concurrent task for every partition of an RDD, up to the number of cores in your cluster (and probably 2-3x times that).


1 Answers

I think I've stumbled on an answer to my question while learning about a key distinction that is often overlooked when talking about lazy evaluation in spark.

Data is lazily evaluated, but schemas are not. So if we are reading parquet, which is a structured data type, spark does have to at least determine the schema of any files it's reading as soon as read() or load() is called. So calling read() on a large number of files will take longer than on a small number of files.

Given that partitions are part of the schema, it's less surprising to me now that spark has to look at all of the files in the path to determine the schema before filtering on a partition field.

It would be convenient for my purposes if spark were to wait until schema evaluation was strictly necessary and was able to filter on partition fields prior to determining the rest of the schema, but it sounds like this is not the case. I believe Dataset objects always must have a schema, so I'm not sure there's a way around this problem without significant changes to Spark.

In conclusion, it seems like my only option currently is to pass in a list of paths for the partitions that I need rather than the base path if I want to avoid evaluating the schema over the entire data repository.

like image 93
croncroncron Avatar answered Oct 05 '22 20:10

croncroncron