Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is partition key column missing from DataFrame

I have a job which loads a DataFrame object and then saves the data to parquet format using the DataFrame partitionBy method. Then I publish the paths created so a subsequent job can use the output. The paths in the output would look like this:

/ptest/_SUCCESS
/ptest/id=0
/ptest/id=0/part-00000-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00001-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00002-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1
/ptest/id=1/part-00003-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00004-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00005-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3
/ptest/id=3/part-00006-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3/part-00007-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet

When I receive new data it is appended to the dataset. The paths are published so jobs which depend on the data can just process the new data.

Here's a simplified example of the code:

>>> rdd = sc.parallelize([(0,1,"A"), (0,1,"B"), (0,2,"C"), (1,2,"D"), (1,10,"E"), (1,20,"F"), (3,18,"G"), (3,18,"H"), (3,18,"I")])
>>> df = sqlContext.createDataFrame(rdd, ["id", "score","letter"])
>>> df.show()
+---+-----+------+
| id|score|letter|
+---+-----+------+
|  0|    1|     A|
|  0|    1|     B|
|  0|    2|     C|
|  1|    2|     D|
|  1|   10|     E|
|  1|   20|     F|
|  3|   18|     G|
|  3|   18|     H|
|  3|   18|     I|
+---+-----+------+
>>> df.write.partitionBy("id").format("parquet").save("hdfs://localhost:9000/ptest")

The problem is when another job tries to read the file using the published paths:

>>> df2 = spark.read.format("parquet").schema(df2.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+-----+------+
|score|letter|
+-----+------+
|    1|     A|
|    1|     B|
|    2|     C|
+-----+------+

As you can see the partition key is missing from the loaded dataset. If I were to publish a schema that jobs could use I can load the file using the schema. The file loads and the partition key exists, but the values are null:

>>> df2 = spark.read.format("parquet").schema(df.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+----+-----+------+
|  id|score|letter|
+----+-----+------+
|null|    1|     A|
|null|    1|     B|
|null|    2|     C|
+----+-----+------+

Is there a way to make sure the partition keys are stored w/in the parquet data? I don't want to require other processes to parse the paths to get the keys.

like image 865
Mark J Miller Avatar asked Apr 03 '17 19:04

Mark J Miller


People also ask

What is a partition in a DataFrame?

DataFrameWriter class which is used to partition based on one or multiple column values while writing DataFrame to Disk/File system. When you write Spark DataFrame to disk by calling partitionBy() , PySpark splits the records based on the partition column and stores each partition data into a sub-directory.

What is a partition in a spark DataFrame?

Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.

How to check partitioning key in Dataframe?

There is no partitioning key once you loaded data, but you can check queryExecution for Partitioner. If you want to support efficient pushdowns on the key, use partitionBy method of DataFrameWriter.

What are the values in the table partitioning key columns?

The values in the table partitioning key columns are used to determine in which data partition each table row belongs. To define the table partitioning key on a table use the CREATE TABLE statement with the PARTITION BY clause.

What is the default number of partitions in spark dataframe?

These functions when called on DataFrame results in shuffling of data across machines or commonly across executors which result in finally repartitioning of data into 200 partitions by default. This default 200 number can be controlled using spark.sql.shuffle.partitions configuration.

Can generated columns be used as Table partitions?

Generated columns can be used as table partitioning keys. This example creates a table with twelve data partitions, one for each month. All rows for January of any year will be placed in the first data partition, rows for February in the second, and so on.


1 Answers

In case like this you should provide basePath option:

(spark.read
    .format("parquet")
    .option("basePath", "hdfs://localhost:9000/ptest/")
    .load("hdfs://localhost:9000/ptest/id=0/"))

which points to the root directory of your data.

With basePath DataFrameReader will be aware of the partitioning and adjust schema accordingly.

like image 112
zero323 Avatar answered Oct 18 '22 14:10

zero323