I have a database with time visit in timestamp like this
ID, time
1, 1493596800
1, 1493596900
1, 1493432800
2, 1493596800
2, 1493596850
2, 1493432800
I use spark SQL and I need to have the longest sequence of consecutives dates for each ID like
ID, longest_seq (days)
1, 2
2, 5
3, 1
I tried to adapt this answer Detect consecutive dates ranges using SQL to my case but I didn't manage to have what I expect.
SELECT ID, MIN (d), MAX(d)
FROM (
SELECT ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date) AS d,
ROW_NUMBER() OVER(
PARTITION BY ID ORDER BY cast(from_utc_timestamp(cast(time as timestamp), 'CEST')
as date)) rn
FROM purchase
where ID is not null
GROUP BY ID, cast(from_utc_timestamp(cast(time as timestamp), 'CEST') as date)
)
GROUP BY ID, rn
ORDER BY ID
If someone has some clue on how to fix this request, or what's wrong in it, I would appreciate the help Thanks
[EDIT] A more explicit input /output
ID, time
1, 1
1, 2
1, 3
2, 1
2, 3
2, 4
2, 5
2, 10
2, 11
3, 1
3, 4
3, 9
3, 11
The result would be :
ID, MaxSeq (in days)
1,3
2,3
3,1
All the visits are in timestamp, but I need consecutives days, then each visit by day is counted once by day
My answer below is adapted from https://dzone.com/articles/how-to-find-the-longest-consecutive-series-of-even for use in Spark SQL. You'll have wrap the SQL queries with:
spark.sql("""
SQL_QUERY
""")
So, for the first query:
CREATE TABLE intermediate_1 AS
SELECT
id,
time,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY time) AS rn,
time - ROW_NUMBER() OVER (PARTITION BY id ORDER BY time) AS grp
FROM purchase
This will give you:
id, time, rn, grp
1, 1, 1, 0
1, 2, 2, 0
1, 3, 3, 0
2, 1, 1, 0
2, 3, 2, 1
2, 4, 3, 1
2, 5, 4, 1
2, 10, 5, 5
2, 11, 6, 5
3, 1, 1, 0
3, 4, 2, 2
3, 9, 3, 6
3, 11, 4, 7
We can see that the consecutive rows have the same grp value. Then we will use GROUP BY and COUNT to get the number of consecutive time.
CREATE TABLE intermediate_2 AS
SELECT
id,
grp,
COUNT(*) AS num_consecutive
FROM intermediate_1
GROUP BY id, grp
This will return:
id, grp, num_consecutive
1, 0, 3
2, 0, 1
2, 1, 3
2, 5, 2
3, 0, 1
3, 2, 1
3, 6, 1
3, 7, 1
Now we just use MAX and GROUP BY to get the max number of consecutive time.
CREATE TABLE final AS
SELECT
id,
MAX(num_consecutive) as max_consecutive
FROM intermediate_2
GROUP BY id
Which will give you:
id, max_consecutive
1, 3
2, 3
3, 1
Hope this helps!
That's the case for my beloved window aggregate functions!
I think the following example could help you out (at least to get started).
The following is the dataset I use. I translated your time (in longs) to numeric time to denote the day (and avoid messing around with timestamps in Spark SQL which could make the solution harder to comprehend...possibly).
In the below visit
dataset, time
column represents the days between dates so 1
s one by one represent consecutive days.
scala> visits.show
+---+----+
| ID|time|
+---+----+
| 1| 1|
| 1| 1|
| 1| 2|
| 1| 3|
| 1| 3|
| 1| 3|
| 2| 1|
| 3| 1|
| 3| 2|
| 3| 2|
+---+----+
Let's define the window specification to group id
rows together.
import org.apache.spark.sql.expressions.Window
val idsSortedByTime = Window.
partitionBy("id").
orderBy("time")
With that you rank
the rows and count rows with the same rank.
val answer = visits.
select($"id", $"time", rank over idsSortedByTime as "rank").
groupBy("id", "time", "rank").
agg(count("*") as "count")
scala> answer.show
+---+----+----+-----+
| id|time|rank|count|
+---+----+----+-----+
| 1| 1| 1| 2|
| 1| 2| 3| 1|
| 1| 3| 4| 3|
| 3| 1| 1| 1|
| 3| 2| 2| 2|
| 2| 1| 1| 1|
+---+----+----+-----+
That appears (very close?) to a solution. You seem done!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With