I am using Spark 2.0.
I have a DataFrame. My code looks something like the following:
df.write.partitionBy("year", "month", "day").format("csv").option("header", "true").save(s"s3://bucket/")
And when the program executes, it writes files in the following format:
s3://bucket/year=2016/month=11/day=15/file.csv
How do I configure the format to be like this:
s3://bucket/2016/11/15/file.csv
I would also like to know if it is possible to configure the filename.
Here is the relevant documentation that seems pretty sparse...
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:
year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.
This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system). Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations.
This is expected and desired behavior. Spark uses directory structure for partition discovery and pruning and the correct structure, including column names, is necessary for it to work.
You also have to remember that partitioning drops the columns used for partitioning.
If you need specific directory structure you should use downstream process to rename directories.
You can use the following script to relayout the directories's name:
#!/usr/bin/env bash
# Rename repartition folder: delete COLUMN=, e.g. DATE=20170708 to 20170708.
path=$1
col=$2
for f in `hdfs dfs -ls $ | awk '{print $NF}' | grep $col=`; do
a="$(echo $f | sed s/$col=//)"
hdfs dfs -mv "$f" "$a"
done
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With