I have a DataFrame generated as follows:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
The results look like:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
I would like to make new dataframes based on every unique value of col("Hour")
, i.e.
So the desired output would be:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
+----+--------+----------+
df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
+----+--------+----------+
and similarly,
df2 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
+----+--------+----------+
Any help is highly appreciated.
EDIT 1:
What I have tried:
df.foreach(
row => splitHour(row)
)
def splitHour(row: Row) ={
val Hour=row.getAs[Long]("Hour")
val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))
val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")
val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))
mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
}
PROBLEM WITH THIS STRATEGY:
It took 8 hours when it was run on a dataframe df
which had over 1 million rows and spark job was given around 10 GB RAM on single node. So, join
is turning out to be highly in-efficient.
Caveat: I have to write each dataframe mydf
as parquet which has nested schema that is required to be maintained (not flattened).
The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.
Example 1: Split dataframe using 'DataFrame.limit()' We will make use of the split() method to create 'n' equal dataframes. Where, Limits the result count to the number specified.
As noted in my comments, one potentially easy approach to this problem would be to use:
df.write.partitionBy("hour").saveAsTable("myparquet")
As noted, the folder structure would be myparquet/hour=1
, myparquet/hour=2
, ..., myparquet/hour=24
as opposed to myparquet/1
, myparquet/2
, ..., myparquet/24
.
To change the folder structure, you could
hcat.dynamic.partitioning.custom.pattern
within an explicit HiveContext; more information at HCatalog DynamicPartitions. df.write.partitionBy.saveAsTable(...)
command with something like for f in *; do mv $f ${f/${f:0:5}/} ; done
which would remove the Hour=
text from the folder name. It is important to note that by changing the naming pattern for the folders, when you are running spark.read.parquet(...)
in that folder, Spark will not automatically understand the dynamic partitions since its missing the partitionKey (i.e. Hour
) information.
Another possible solution:
df.write.mode("overwrite").partitionBy("hour").parquet("address/to/parquet/location")
This is similar to the first answer except using parquet
and using mode("overwrite")
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With