Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframes - derive single row containing non-null values per key from multiple such rows

I am a newbie to spark-scala and need some help from the community.

This is an app log, each request is scattered in 5 to 6 lines, unique key in all the lines are reqID. Each lines has some columns to collect and I need to write 1 record per reqID in table.

val jsondftemp = spark.read.json('path') to read the json file    

My Input join file:

{"srchTrnsPhrs":"Psychiatric Care","Nm":"bh","Num":"746","reqPlsize":"11707","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncStrtTm":"2019-04-16 00:00:16.356614","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncEndTm":"2019-04-16 00:00:16.536903","execTm":"180","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}

My schema:

|-- CoreFuncEndTm: string (nullable = true)
|-- CoreFuncStrtTm: string (nullable = true)
|-- Nm: string (nullable = true)
|-- Num : string (nullable = true)
|-- execTm: string (nullable = true)
|-- reqID: string (nullable = true)
|-- srchTrnsPhrs: string (nullable = true)
|-- reqPlsize:  string (nullable = true)    

Data Frame has:

+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|                null|                null|     bh|       null|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|
|                null|2019-04-16 00:00:...|   null|       null|a520a039-310b-485...|                null|   null|   null|
|2019-04-16 00:00:...|                null|   null|        180|a520a039-310b-485...|                null|   null|   null|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+-------+ 

expected output:

+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|2019-04-16 00:00:...|2019-04-16 00:00:...|     bh|        180|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|

Any help on this is really appreciated.
ReqID is the key to join all the lines, confused with Reducebykey and group by key operations.

like image 847
raj kumar Avatar asked Nov 26 '25 00:11

raj kumar


1 Answers

Simple approach from loaded DF onwards ONLY.

  1. Explicit naming of columns, but could be dynamic with (_), etc.
  2. Same types.
  3. Need to assess how your null-values handled.
  4. Handles any format of data you can throw at it in general.

Here are some goodies with a couple of techniques but not overloading the newbie as it were:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val colAggList = List("a", "b", "c", "d")
val dropCols = Seq("a", "b", "c", "d") 

val convToString = udf((arr: Seq[String]) => arr.mkString(",")) // Could just get the 1st element via data.withColumn("newcolname", $"colname"(0))

val df = sc.parallelize(Seq(
   ("r1", Some(1), Some(1), None, Some("x")),
   ("r1", None, None, Some(3), None),
   ("r2", Some(6), Some(4), None, Some("y")),
   ("r3", None, Some(1), Some(5), Some("abc")),
   ("r3", Some(4), None, None, None),
   ("r4", Some(1), None, None, None),
   ("r4", None, Some(2), None, None),
   ("r4", None, None, Some(3), None),
   ("r4", None, None, None, Some("xyz")),
   ("r5", Some(1), Some(2), Some(7), Some("A"))
   )).toDF("ID", "a", "b", "c", "d")
df.show(false)
df.printSchema()

// Note Nones, nulls are not collected.
val df2 = df.groupBy("ID").agg( collect_list(colAggList(0)).as("a"), collect_list(colAggList(1)).as("b"), collect_list(colAggList(2)).as("c"),     collect_list(colAggList(3)).as("d") ) 
df2.show(false)
df2.printSchema()

val df3 = df2.withColumn("aStr", convToString($"a")).withColumn("bStr", convToString($"b")).withColumn("cStr", convToString($"c")).withColumn("dStr", convToString($"d")).drop(dropCols:_*)
df3.show(false)
df3.printSchema()

returning and so you can see how it works - original and final output only shown:

+---+----+----+----+----+
|ID |a   |b   |c   |d   |
+---+----+----+----+----+
|r1 |1   |1   |null|x   |
|r1 |null|null|3   |null|
|r2 |6   |4   |null|y   |
|r3 |null|1   |5   |abc |
|r3 |4   |null|null|null|
|r4 |1   |null|null|null|
|r4 |null|2   |null|null|
|r4 |null|null|3   |null|
|r4 |null|null|null|xyz |
|r5 |1   |2   |7   |A   |
+---+----+----+----+----+


+---+----+----+----+----+
|ID |aStr|bStr|cStr|dStr|
+---+----+----+----+----+
|r1 |1   |1   |3   |x   |
|r5 |1   |2   |7   |A   |
|r2 |6   |4   |    |y   |
|r4 |1   |2   |3   |xyz |
|r3 |4   |1   |5   |abc |
+---+----+----+----+----+

Note the contrived missing value shown as a blank.

like image 105
thebluephantom Avatar answered Nov 27 '25 14:11

thebluephantom



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!