I have a use case that seems relatively simple to solve using Spark, but can't seem to figure out a sure way to do this.
I have a dataset which contains time series data for various users. All I'm looking to do is:
I tried using the following code snippet, but ended up getting surprising results. I do end up with 1 csv file per user ID and some users' time series data do end up getting sorted, but a lot of other users' were unsorted.
# repr(ds) = DataFrame[userId: string, timestamp: string, c1: float, c2: float, c3: float, ...]
ds = load_dataset(user_dataset_path)
ds.repartition("userId")
.sortWithinPartitions("timestamp")
.write
.partitionBy("userId")
.option("header", "true")
.csv(output_path)
I'm unclear as to why this could happen, and I'm not entirely sure how to do this. I'm also not sure if this is potentially a bug in Spark.
I'm using Spark 2.0.2 with Python 2.7.12. Any advice would be very much appreciated!
The following code works for me (shown here in Scala, but is similar for Python).
I get one file for each username with the rows in the output file sorted by the timestamp value.
testDF
.select( $"username", $"timestamp", $"activity" )
.repartition(col("username"))
.sortWithinPartitions(col("username"),col("timestamp")) // <-- both here
.write
.partitionBy("username")
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", ",")
.csv(folder + "/useractivity")
The import thing is to have both the username and timestamp columns as parameters to sortWithinPartitions.
Here is how one of the output files looks (I used a simple integer as my timestamp):
timestamp,activity
345,login
402,upload
515,download
600,logout
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With