Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark, Compare two rows in dataframe

I'm attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like:

 itemid | eventid | timestamp
 ----------------------------
 134    | 30      | 2016-07-02 12:01:40
 134    | 32      | 2016-07-02 12:21:23
 125    | 30      | 2016-07-02 13:22:56
 125    | 32      | 2016-07-02 13:27:07

I've tried mapping a function onto the dataframe to allow for comparing like this: (note: I'm trying to get rows with a difference greater than 4 hours)

items = df.limit(10)\
          .orderBy('itemid', desc('stamp'))\
          .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()

But I'm getting the following error:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe

Which I believe is due to my using the map function incorrectly. Help with using map, or a different solution would be appreciated.

UPDATE: @zero323's answer was informative on my improper use of mapping, however the system I'm using is running a Spark version before 2.02 and I'm working with data in Cassandra.

I managed to solve it with mapPartitions. See my answer below.

UPDATE(2017/03/27): Since originally marking the answer on this post my understanding of Spark has improved significantly. I've updated my answer below to show my current solution.

like image 861
ivywit Avatar asked Jul 06 '16 17:07

ivywit


1 Answers

Yes, you're using map function in a wrong way. map operates on a single element at the time. You can try to use window functions like this:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)
like image 161
zero323 Avatar answered Sep 19 '22 16:09

zero323