Given a dataframe with a column of arrays of integers with different sizes:
scala> sampleDf.show()
+------------+
| arrays|
+------------+
|[15, 16, 17]|
|[15, 16, 17]|
| [14]|
| [11]|
| [11]|
+------------+
scala> sampleDf.printSchema()
root
|-- arrays: array (nullable = true)
| |-- element: integer (containsNull = true)
I would like to generate a new column with a random chosen item in each array.
I've tried two solution:
1. Using UDF:
import scala.util.Random
def getRandomElement(arr: Array[Int]): Int = {
arr(Random.nextInt(arr.size))
}
val getRandomElementUdf = udf{arr: Array[Int] => getRandomElement(arr)}
sampleDf.withColumn("randomItem", getRandomElementUdf('arrays)).show
crashes on the last line with a long error message: (extracts)
...
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<int>) => int)
...
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
I've tried with the alternative udf definition:
val getRandomElementUdf = udf[Int, Array[Int]] (getRandomElement)
but I get the same error.
2. Second method by creating intermediary columns with a random index in the range of the corresponding array:
// Return a dataframe with a column with random index from column of Arrays with different sizes
def choice(df: DataFrame, colName: String): DataFrame = {
df.withColumn("array_size", size(col(colName)))
.withColumn("random_idx", least('array_size, floor(rand * 'array_size)))
}
choice(sampleDf, "arrays").show
outputs:
+------------+----------+----------+
| arrays|array_size|random_idx|
+------------+----------+----------+
|[15, 16, 17]| 3| 2|
|[15, 16, 17]| 3| 1|
| [14]| 1| 0|
| [11]| 1| 0|
| [11]| 1| 0|
+------------+----------+----------+
and ideally we would like to use the column random_idx to choose an item in column arrays, kind of:
sampleDf.withColumn("choosen_item", 'arrays.getItem('random_idx))
Unfortunaltely, getItem cannot take a column as argument.
Any suggestion is welcome.
You can use the below udf to select the random element from the array as
val getRandomElement = udf ((array: Seq[Integer]) => {
array(Random.nextInt(array.size))
})
df.withColumn("c1", getRandomElement($"arrays"))
.withColumn("c2", getRandomElement($"arrays"))
.withColumn("c3", getRandomElement($"arrays"))
.withColumn("c4", getRandomElement($"arrays"))
.withColumn("c5", getRandomElement($"arrays"))
.show(false)
You can see the random element selected in each use as a new column.
+------------+---+---+---+---+---+
|arrays |c1 |c2 |c3 |c4 |c5 |
+------------+---+---+---+---+---+
|[15, 16, 17]|15 |16 |16 |17 |16 |
|[15, 16, 17]|16 |16 |17 |15 |15 |
|[14] |14 |14 |14 |14 |14 |
|[11] |11 |11 |11 |11 |11 |
|[11] |11 |11 |11 |11 |11 |
+------------+---+---+---+---+---+
If you want to remain udf-free, here is a possibility:
first add a key to the dataframe outputed by choice (assume its name is choiceDf)
val myDf = choiceDf.withColumn("key", monotonically_increasing_id())
then create an intermediary dataframe that explode the arrays column and keep the index of the values
val tmp = myDf.select('key, posexplode('arrays))
finally join using key and random_idx
myDf.join(tmp.withColumnRenamed("pos", "random_idx"), Seq("key", "random_idx", "left")
the item you look for is stored in the column col
+---+----------+------------+----------+---+
|key|random_idx| arrays|array_size|col|
+---+----------+------------+----------+---+
| 0| 2|[15, 16, 17]| 3| 17|
| 1| 1|[15, 16, 17]| 3| 16|
| 2| 0| [14]| 1| 14|
+---+----------+------------+----------+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With