Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using partitionBy on a DataFrameWriter writes directory layout with column names not just values

I am using Spark 2.0.

I have a DataFrame. My code looks something like the following:

df.write.partitionBy("year", "month", "day").format("csv").option("header", "true").save(s"s3://bucket/")

And when the program executes, it writes files in the following format:

s3://bucket/year=2016/month=11/day=15/file.csv

How do I configure the format to be like this:

s3://bucket/2016/11/15/file.csv

I would also like to know if it is possible to configure the filename.

Here is the relevant documentation that seems pretty sparse...
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
like image 971
satoukum Avatar asked Nov 15 '16 23:11

satoukum


People also ask

How does partitionBy work in spark?

PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.

How do I partition data in PySpark?

PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system). Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations.


2 Answers

This is expected and desired behavior. Spark uses directory structure for partition discovery and pruning and the correct structure, including column names, is necessary for it to work.

You also have to remember that partitioning drops the columns used for partitioning.

If you need specific directory structure you should use downstream process to rename directories.

like image 63
user7723061 Avatar answered Oct 10 '22 00:10

user7723061


You can use the following script to relayout the directories's name:

#!/usr/bin/env bash

# Rename repartition folder: delete COLUMN=, e.g. DATE=20170708 to 20170708.

path=$1
col=$2
for f in `hdfs dfs -ls $ | awk '{print $NF}' | grep $col=`; do
    a="$(echo $f | sed s/$col=//)"
    hdfs dfs -mv "$f" "$a"
done
like image 45
Duong Nguyen Avatar answered Oct 10 '22 01:10

Duong Nguyen