I have a job which loads a DataFrame object and then saves the data to parquet format using the DataFrame partitionBy
method. Then I publish the paths created so a subsequent job can use the output. The paths in the output would look like this:
/ptest/_SUCCESS
/ptest/id=0
/ptest/id=0/part-00000-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00001-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00002-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1
/ptest/id=1/part-00003-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00004-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00005-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3
/ptest/id=3/part-00006-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3/part-00007-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
When I receive new data it is appended to the dataset. The paths are published so jobs which depend on the data can just process the new data.
Here's a simplified example of the code:
>>> rdd = sc.parallelize([(0,1,"A"), (0,1,"B"), (0,2,"C"), (1,2,"D"), (1,10,"E"), (1,20,"F"), (3,18,"G"), (3,18,"H"), (3,18,"I")])
>>> df = sqlContext.createDataFrame(rdd, ["id", "score","letter"])
>>> df.show()
+---+-----+------+
| id|score|letter|
+---+-----+------+
| 0| 1| A|
| 0| 1| B|
| 0| 2| C|
| 1| 2| D|
| 1| 10| E|
| 1| 20| F|
| 3| 18| G|
| 3| 18| H|
| 3| 18| I|
+---+-----+------+
>>> df.write.partitionBy("id").format("parquet").save("hdfs://localhost:9000/ptest")
The problem is when another job tries to read the file using the published paths:
>>> df2 = spark.read.format("parquet").schema(df2.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+-----+------+
|score|letter|
+-----+------+
| 1| A|
| 1| B|
| 2| C|
+-----+------+
As you can see the partition key is missing from the loaded dataset. If I were to publish a schema that jobs could use I can load the file using the schema. The file loads and the partition key exists, but the values are null:
>>> df2 = spark.read.format("parquet").schema(df.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+----+-----+------+
| id|score|letter|
+----+-----+------+
|null| 1| A|
|null| 1| B|
|null| 2| C|
+----+-----+------+
Is there a way to make sure the partition keys are stored w/in the parquet data? I don't want to require other processes to parse the paths to get the keys.
DataFrameWriter class which is used to partition based on one or multiple column values while writing DataFrame to Disk/File system. When you write Spark DataFrame to disk by calling partitionBy() , PySpark splits the records based on the partition column and stores each partition data into a sub-directory.
Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.
There is no partitioning key once you loaded data, but you can check queryExecution for Partitioner. If you want to support efficient pushdowns on the key, use partitionBy method of DataFrameWriter.
The values in the table partitioning key columns are used to determine in which data partition each table row belongs. To define the table partitioning key on a table use the CREATE TABLE statement with the PARTITION BY clause.
These functions when called on DataFrame results in shuffling of data across machines or commonly across executors which result in finally repartitioning of data into 200 partitions by default. This default 200 number can be controlled using spark.sql.shuffle.partitions configuration.
Generated columns can be used as table partitioning keys. This example creates a table with twelve data partitions, one for each month. All rows for January of any year will be placed in the first data partition, rows for February in the second, and so on.
In case like this you should provide basePath
option
:
(spark.read
.format("parquet")
.option("basePath", "hdfs://localhost:9000/ptest/")
.load("hdfs://localhost:9000/ptest/id=0/"))
which points to the root directory of your data.
With basePath
DataFrameReader
will be aware of the partitioning and adjust schema accordingly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With