Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sort by date an Array of a Spark DataFrame Column

I have a DataFrame formated as below:

+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[3, 19/06/2012-02.42.01], [4, 17/06/2012-18.22.21]]  |
|A  |[[1, 15/06/2012-18.22.16], [2, 15/06/2012-09.22.35]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

I would like to sort each element of DateInfos column by date with the timestamp in the second element of my Array

+---+------------------------------------------------------+
|Id |DateInfos                                             |
+---+------------------------------------------------------+
|B  |[[4, 17/06/2012-18.22.21], [3, 19/06/2012-02.42.01]]  |
|A  |[[2, 15/06/2012-09.22.35], [1, 15/06/2012-18.22.16]]  |
|C  |[[5, 14/06/2012-05.20.01]]                            |
+---+------------------------------------------------------+

the schema of my DataFrame is printed as below:

root
 |-- C1: string (nullable = true)
 |-- C2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = false)

I assume I have to create an udf which use a function with the following signature:

def sort_by_date(mouvements : Array[Any]) : Array[Any]

Do you have any idea?

like image 431
a.moussa Avatar asked Nov 14 '16 11:11

a.moussa


1 Answers

That's indeed a bit tricky - because although the UDF's input and output types seem identical, we can't really define it that way - because the input is actually a mutable.WrappedArray[Row] and the output can't use Row or else Spark will fail to decode it into a Row...

So we define a UDF that takes a mutable.WrappedArray[Row] and returns an Array[(Int, String)]:

val sortDates = udf { arr: mutable.WrappedArray[Row] =>
  arr.map { case Row(i: Int, s: String) => (i, s) }.sortBy(_._2)
}

val result = input.select($"Id", sortDates($"DateInfos") as "DateInfos")

result.show(truncate = false)
// +---+--------------------------------------------------+
// |Id |DateInfos                                         |
// +---+--------------------------------------------------+
// |B  |[[4,17/06/2012-18.22.21], [3,19/06/2012-02.42.01]]|
// |A  |[[2,15/06/2012-09.22.35], [1,15/06/2012-18.22.16]]|
// |C  |[[5,14/06/2012-05.20.01]]                         |
// +---+--------------------------------------------------+
like image 73
Tzach Zohar Avatar answered Oct 02 '22 12:10

Tzach Zohar