Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partition a spark dataframe based on column value?

i have a dataframe from a sql source which looks like:

User(id: Long, fname: String, lname: String, country: String)

[1, Fname1, Lname1, Belarus]
[2, Fname2, Lname2, Belgium]
[3, Fname3, Lname3, Austria]
[4, Fname4, Lname4, Australia]

I want to partition and write this data into csv files where each partition is based on initial letter of the country, so Belarus and Belgium should be one in output file, Austria and Australia in other.

like image 779
jdk2588 Avatar asked Jul 07 '17 07:07

jdk2588


2 Answers

Here is what you can do

import org.apache.spark.sql.functions._
//create a dataframe with demo data
val df = spark.sparkContext.parallelize(Seq(
  (1, "Fname1", "Lname1", "Belarus"),
  (2, "Fname2", "Lname2", "Belgium"),
  (3, "Fname3", "Lname3", "Austria"),
  (4, "Fname4", "Lname4", "Australia")
)).toDF("id", "fname","lname", "country")

//create a new column with the first letter of column
val result = df.withColumn("countryFirst", split($"country", "")(0))

//save the data with partitionby first letter of country 

result.write.partitionBy("countryFirst").format("com.databricks.spark.csv").save("outputpath")

Edited: You can also use the substring which can increase the performance as suggested by Raphel as

substring(Column str, int pos, int len) Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type

val result = df.withColumn("firstCountry", substring($"country",1,1))

and then use partitionby with write

Hope this solves your problem!

like image 151
koiralo Avatar answered Sep 27 '22 18:09

koiralo


One alternative to solve this problem would be to first create a column containing only the first letter of each country. Having done this step, you could use partitionBy to save each partition to separate files.

dataFrame.write.partitionBy("column").format("com.databricks.spark.csv").save("/path/to/dir/")
like image 37
Shaido Avatar answered Sep 27 '22 16:09

Shaido