Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scala split single row to multiple rows based on time column

I have a data frame in below format:

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |           10800 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            7000 | 

I need to convert this data frame to below, basically, I need to create an entry per hour so if WatchTime(sec) is more than 3600 seconds I need to create a new entry for the next hour

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |            3600 |
|user1 | 2019-07-28 |  22 |        100 |            3600 |
|user1 | 2019-07-28 |  23 |        100 |            3600 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            3600 | 
|user3 | 2019-07-28 |  22 |        202 |            3400 |

This can be achieved someway using SQL but I am using Scala and what is the efficient way to achieve this.

like image 799
toofrellik Avatar asked Mar 05 '26 00:03

toofrellik


1 Answers

You can achieve this in spark 2.4+ with the following transformations:

  • Split the WatchTime in an array of 3600 seconds with sequence higher-order function
  • Explode the array to generate the new rows
  • Adjust the Hour and WatchTime values for each rows
  • Remove all rows with a zero WatchTime
val result = df
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
   .withColumn("offset", explode('stamps))
   .withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
   .withColumn("WatchTime", 'WatchTime - 'offset)
   .withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
   .filter('WatchTime > 0)
   .drop("stamps","offset")

result.show()
+------+-------------------+----+----------+---------+
|u_name|               Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00|  21|       100|     3600|
| user1|2019-07-28 00:00:00|  22|       100|     3600|
| user1|2019-07-28 00:00:00|  23|       100|     3600|
| user2|2019-07-28 00:00:00|  20|       101|     3600|
| user3|2019-07-28 00:00:00|  21|       202|     3600|
| user3|2019-07-28 00:00:00|  22|       202|     3400|
+------+-------------------+----+----------+---------+

This algorithm may generate hours higher than 23. If you need accurate Date and Hour information, I'd advice you to use single unix timestamp column combining the start date and hour since it will let you do time manipulation and proper conversion to date and hour when needed.

It would look like this:

val result = df
   .withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))  
   .withColumn("offset", explode('stamps))
   .withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
   .withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
   .filter('WatchTime > 0)
   .select('u_name, 'content_id, 'StartDateTime, 'WatchTime)

result.show
+------+----------+-------------------+---------+
|u_name|content_id|      StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1|       100|2019-07-28 21:00:00|     3600|
| user1|       100|2019-07-28 22:00:00|     3600|
| user1|       100|2019-07-28 23:00:00|     3600|
| user2|       101|2019-07-28 20:00:00|     3600|
| user3|       202|2019-07-28 21:00:00|     3600|
| user3|       202|2019-07-28 22:00:00|     3400|
+------+----------+-------------------+---------+
like image 107
rluta Avatar answered Mar 07 '26 16:03

rluta



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!