Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write Spark dataframe as CSV with partitions

I'm trying to write a dataframe in spark to an HDFS location and I expect that if I'm adding the partitionBy notation Spark will create partition (similar to writing in Parquet format) folder in form of

partition_column_name=partition_value

( i.e partition_date=2016-05-03). To do so, I ran the following command :

(df.write
    .partitionBy('partition_date')
    .mode('overwrite')
    .format("com.databricks.spark.csv")
    .save('/tmp/af_organic'))

but partition folders had not been created any idea what sould I do in order for spark DF automatically create those folders?

Thanks,

like image 493
Lior Baber Avatar asked May 29 '16 12:05

Lior Baber


1 Answers

Spark 2.0.0+:

Built-in csv format supports partitioning out of the box so you should be able to simply use:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

without including any additional packages.

Spark < 2.0.0:

At this moment (v1.4.0) spark-csv doesn't support partitionBy (see databricks/spark-csv#123) but you can adjust built-in sources to achieve what you want.

You can try two different approaches. Assuming your data is relatively simple (no complex strings and need for character escaping) and looks more or less like this:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

You can manually prepare values for writing:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

and write using text source

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

In more complex cases you can try to use proper CSV parser to preprocess values in a similar way, either by using UDF or mapping over RDD, but it will be significantly more expensive.

If CSV format is not a hard requirement you can also use JSON writer which supports partitionBy out-of-the-box:

df.write.partitionBy("k").json("/tmp/bar")

as well as partition discovery on read.

like image 122
zero323 Avatar answered Oct 07 '22 11:10

zero323