How to implement a custom explode function using udfs, so we can have extra information on items? For example, along with items, I want to have items' indices.
The part I do not know how to do is when a udf returns multiple values and we should place those values as separate rows.
pyspark.sql.functions. explode (col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
To split multiple array column data into rows pyspark provides a function called explode(). Using explode, we will get a new row for each element in the array.
The explode function explodes the dataframe into multiple rows.
If you need custom explode function, then you need to write UDF that gets array and returns array. For example for this DF:
df = spark.createDataFrame([(['a', 'b', 'c'], ), (['d', 'e'],)], ['array'])
df.show()
+---------+
| array|
+---------+
|[a, b, c]|
| [d, e]|
+---------+
The function that adds index and explodes the results can look like this:
from pyspark.sql.types import *
value_with_index = StructType([
StructField('index', IntegerType()),
StructField('letter', StringType())
])
add_indices = udf(lambda arr: list(zip(range(len(arr)), arr)), ArrayType(value_with_index))
df.select(explode(add_indices('array'))).select('col.index', 'col.letter').show()
+-----+------+
|index|letter|
+-----+------+
| 0| a|
| 1| b|
| 2| c|
| 0| d|
| 1| e|
+-----+------+
In Spark v. 2.1+, there is pyspark.sql.functions.posexplode()
which will explode the array and provide the index:
Using the same example as @Mariusz:
df.show()
#+---------+
#| array|
#+---------+
#|[a, b, c]|
#| [d, e]|
#+---------+
df.select(f.posexplode('array')).show()
#+---+---+
#|pos|col|
#+---+---+
#| 0| a|
#| 1| b|
#| 2| c|
#| 0| d|
#| 1| e|
#+---+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With