Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient way to read specific columns from parquet file in spark

What is the most efficient way to read only a subset of columns in spark from a parquet file that has many columns? Is using spark.read.format("parquet").load(<parquet>).select(...col1, col2) the best way to do that? I would also prefer to use typesafe dataset with case classes to pre-define my schema but not sure.

like image 521
horatio1701d Avatar asked Jan 24 '18 12:01

horatio1701d


People also ask

How do I read a specific column in a Parquet file?

read. parquet("fs://path/file.parquet").select(...) This will only read the corresponding columns. Indeed, parquet is a columnar storage and it is exactly meant for this type of use case.

How do I select certain columns in spark DataFrame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.


4 Answers

val df = spark.read.parquet("fs://path/file.parquet").select(...)

This will only read the corresponding columns. Indeed, parquet is a columnar storage and it is exactly meant for this type of use case. Try running df.explain and spark will tell you that only the corresponding columns are read (it prints the execution plan). explain would also tell you what filters are pushed down to the physical plan of execution in case you also use a where condition. Finally use the following code to convert the dataframe (dataset of rows) to a dataset of your case class.

case class MyData...
val ds = df.as[MyData]
like image 117
Oli Avatar answered Oct 04 '22 11:10

Oli


Spark supports pushdowns with Parquet so

load(<parquet>).select(...col1, col2)

is fine.

I would also prefer to use typesafe dataset with case classes to pre-define my schema but not sure.

This could be an issue, as it looks like some optimizations don't work in this context Spark 2.0 Dataset vs DataFrame

like image 38
Alper t. Turker Avatar answered Oct 04 '22 09:10

Alper t. Turker


At least in some cases getting dataframe with all columns + selecting a subset won't work. E.g. the following will fail if parquet contains at least one field with type that is not supported by Spark:

spark.read.format("parquet").load("<path_to_file>").select("col1", "col2")

One solution is to provide schema that contains only requested columns to load:

spark.read.format("parquet").load("<path_to_file>",
                                   schema="col1 bigint, col2 float")

Using this you will be able to load a subset of Spark-supported parquet columns even if loading the full file is not possible. I'm using pyspark here, but would expect Scala version to have something similar.

like image 24
Alexander Pivovarov Avatar answered Oct 04 '22 10:10

Alexander Pivovarov


Parquet is a columnar file format. It is exactly designed for these kind of use cases.

val df = spark.read.parquet("<PATH_TO_FILE>").select(...)

should do the job for you.

like image 26
moriarty007 Avatar answered Oct 04 '22 09:10

moriarty007