I am trying to call partitionBy on a nested field like below:
val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("data.dataDetails.name").parquet(filenameParquet)
I get the below error when I run it. I do see the 'name' listed as the field in the below schema. Is there a different format to specify the column name which is nested?
java.lang.RuntimeException: Partition column data.dataDetails.name not found in schema StructType(StructField(name,StringType,true), StructField(time,StringType,true), StructField(data,StructType(StructField(dataDetails,StructType(StructField(name,StringType,true), StructField(id,StringType,true),true)),true))
This is my json file:
{
"name": "AssetName",
"time": "2016-06-20T11:57:19.4941368-04:00",
"data": {
"type": "EventData",
"dataDetails": {
"name": "EventName"
"id": "1234"
}
}
}
PySpark partitionBy() is used to partition based on column values while writing DataFrame to Disk/File system. When you write DataFrame to Disk by calling partitionBy() Pyspark splits the records based on the partition column and stores each partition data into a sub-directory.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
The lower bound for spark partitions is determined by 2 X number of cores in the cluster available to application. Determining the upper bound for partitions in Spark, the task should take 100+ ms time to execute.
Differences between coalesce and repartitionThe repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.
This appears to be a known issue listed here: https://issues.apache.org/jira/browse/SPARK-18084
I had this issue as well and to work around it I was able to un-nest the columns on my dataset. My dataset was a little different than your dataset, but here is the strategy...
Original Json:
{
"name": "AssetName",
"time": "2016-06-20T11:57:19.4941368-04:00",
"data": {
"type": "EventData",
"dataDetails": {
"name": "EventName"
"id": "1234"
}
}
}
Modified Json:
{
"name": "AssetName",
"time": "2016-06-20T11:57:19.4941368-04:00",
"data_type": "EventData",
"data_dataDetails_name" : "EventName",
"data_dataDetails_id": "1234"
}
}
Code to get to Modified Json:
def main(args: Array[String]) {
...
val data = df.select(children("data", df) ++ $"name" ++ $"time"): _*)
data.printSchema
data.write.partitionBy("data_dataDetails_name").format("csv").save(...)
}
def children(colname: String, df: DataFrame) = {
val parent = df.schema.fields.filter(_.name == colname).head
val fields = parent.dataType match {
case x: StructType => x.fields
case _ => Array.empty[StructField]
}
fields.map(x => col(s"$colname.${x.name}").alias(s"$colname" + s"_" + s"${x.name}"))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With