I'm attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like:
itemid | eventid | timestamp
----------------------------
134 | 30 | 2016-07-02 12:01:40
134 | 32 | 2016-07-02 12:21:23
125 | 30 | 2016-07-02 13:22:56
125 | 32 | 2016-07-02 13:27:07
I've tried mapping a function onto the dataframe to allow for comparing like this: (note: I'm trying to get rows with a difference greater than 4 hours)
items = df.limit(10)\
.orderBy('itemid', desc('stamp'))\
.map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()
But I'm getting the following error:
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe
Which I believe is due to my using the map function incorrectly. Help with using map, or a different solution would be appreciated.
UPDATE: @zero323's answer was informative on my improper use of mapping, however the system I'm using is running a Spark version before 2.02 and I'm working with data in Cassandra.
I managed to solve it with mapPartitions. See my answer below.
UPDATE(2017/03/27): Since originally marking the answer on this post my understanding of Spark has improved significantly. I've updated my answer below to show my current solution.
Yes, you're using map
function in a wrong way. map
operates on a single element at the time. You can try to use window functions like this:
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
df = (
sc.parallelize([
(134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
(125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
]).toDF(["itemid", "eventid", "timestamp"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
)
w = Window.partitionBy("itemid").orderBy("timestamp")
diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")
df.withColumn("diff", diff)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With