Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing a list of tuples as a parameter to a spark udf in scala

I am trying to pass a list of tuples to a udf in scala. I am not sure how to exactly define the datatype for this. I tried to pass it as a whole row but it can't really resolve it. I need to sort the list based on the first element of the tuple and then send n number of elements back. I have tried the following definitions for the udf

def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Row)

This is what the idList looks like:

[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]

This is a dataframe with a 100 rows like the above. I call the udf as follows:

testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show

When I print the schema for the dataframe it reads it as a array of structs. The idList itself is generated by doing a collect list over a column of tuples grouped by a key and stored in the dataframe. Any ideas on what I am doing wrong? Thanks!

like image 844
Roshini Avatar asked Jan 09 '17 15:01

Roshini


1 Answers

When defining a UDF, you should use plain Scala types (e.g. Tuples, Primitives...) and not the Spark SQL types (e.g. StructType) as the output types.

As for the input types - this is where it gets tricky (and not too well documented) - an array of tuples would actually be a mutable.WrappedArray[Row]. So - you'll have to "convert" each row into a tuple first, then you can do the sorting and return the result.

Lastly, by your description it seems that id column isn't used at all, so I removed it from the UDF definition, but it can easily be added back.

val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
  // converts the array items into tuples, sorts by first item and returns first two tuples:
  idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}

df.withColumn("result", udfFilterPath($"idList")).show(false)

+------+-------------------------------------------+----------------------------+
|id    |idList                                     |result                      |
+------+-------------------------------------------+----------------------------+
|1234  |[[1234,Tony], [2345,Angela]]               |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+
like image 88
Tzach Zohar Avatar answered Sep 21 '22 01:09

Tzach Zohar