Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark not leveraging hdfs partitioning with parquet

I am writing a parquet file to hdfs using the following command: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)

Afterwards I am reading and filtering the file like this:

val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
    r.getLong(2), r.getString(3)))

val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()

I would expect, that Spark would leverage the partitioning of the file and only read the partition of "thingId = 1". In Fact, Spark does read all partitions of the file and not only the filtered one (partition with thingId=1). If I look in the logs, I can see that it does read everything:

16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet

Is there anything I am missing? When I look at the docs, Spark should know based on the filter, that it should only read Partition with thingID=1. Does anyone of you have an idea whats the problem?

like image 229
AlexL Avatar asked Mar 21 '16 08:03

AlexL


People also ask

Can Parquet file be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

How does Spark partition ing work on files in HDFS?

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record of RDD should be stored on. When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file.


1 Answers

A few issues may prevent Spark from successfully "pushing down" predicates (i.e. using filter at input-format level):

  1. filter-pushdown is OFF: depending on Spark's version you are using, the predicate pushdown option (spark.sql.parquet.filterPushdown) might be turned off. It's ON by default as of Spark 1.5.0 - so check your version and your configuration

  2. filter is "opaque": This seems to be the case here: you're loading parquet file, mapping each row to another Row (reordering columns?), and then using the filter method that accepts a function. Spark can't "read" the function code and realize that it uses a comparison on the partitioning column - to Spark, this is just a Row => Boolean function that can do all sorts of checks...

    For filter pushdown to work, you need to use it before mapping the records into something that is "detached" from the original structure, and to use one of the filter overloads that uses a filter that is parseable by Spark, for example:

    // assuming the relevant column name is "id" in the parquet structure
    val filtered = file.filter("id = 1") 
    
    // or:
    val filtered = file.filter(col("id") === 1) 
    
    // and only then:
    val data = filtered.map(r => Row(...))
    
like image 189
Tzach Zohar Avatar answered Oct 03 '22 05:10

Tzach Zohar