Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: can you include partition columns in output files?

I am using Spark to write out data into partitions. Given a dataset with two columns (foo, bar), if I do df.write.mode("overwrite").format("csv").partitionBy("foo").save("/tmp/output"), I get an output of

/tmp/output/foo=1/X.csv
/tmp/output/foo=2/Y.csv
...

However, the output CSV files only contain the value for bar, not foo. I know the value of foo is already captured in the directory name foo=N, but is it possible to also include the value of foo in the CSV file?

like image 406
erwaman Avatar asked Jan 10 '18 14:01

erwaman


People also ask

How does spark partition ing work on files in HDFS?

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record of RDD should be stored on. When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file.

How can we partition files in spark?

This can be achieved by changing the spark partition size and number of spark partitions. This can be done using the repartition() method. repartition() shuffles the data and divides it into a number partitions. But a better way to spark partitions is to do it at the data source and save network traffic.

When should I use partition in spark?

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

What is a reasonable number of partitions for spark?

Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.


1 Answers

Only if you make a copy under different name:

(df
    .withColumn("foo_", col("foo"))
    .write.mode("overwrite")
    .format("csv").partitionBy("foo_").save("/tmp/output"))
like image 182
Alper t. Turker Avatar answered Oct 04 '22 19:10

Alper t. Turker