Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark DataFrame: Custom Explode Function

Tags:

pyspark

How to implement a custom explode function using udfs, so we can have extra information on items? For example, along with items, I want to have items' indices.

The part I do not know how to do is when a udf returns multiple values and we should place those values as separate rows.

like image 871
ashim Avatar asked Sep 12 '17 19:09

ashim


People also ask

What does explode function do in PySpark?

pyspark.sql.functions. explode (col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.

How do you explode rows in PySpark?

To split multiple array column data into rows pyspark provides a function called explode(). Using explode, we will get a new row for each element in the array.

What does explode () do in a JSON field?

The explode function explodes the dataframe into multiple rows.


2 Answers

If you need custom explode function, then you need to write UDF that gets array and returns array. For example for this DF:

df = spark.createDataFrame([(['a', 'b', 'c'], ), (['d', 'e'],)], ['array'])
df.show()
+---------+
|    array|
+---------+
|[a, b, c]|
|   [d, e]|
+---------+

The function that adds index and explodes the results can look like this:

from pyspark.sql.types import *
value_with_index = StructType([
    StructField('index', IntegerType()),
    StructField('letter', StringType())
])
add_indices = udf(lambda arr: list(zip(range(len(arr)), arr)), ArrayType(value_with_index))
df.select(explode(add_indices('array'))).select('col.index', 'col.letter').show()
+-----+------+
|index|letter|
+-----+------+
|    0|     a|
|    1|     b|
|    2|     c|
|    0|     d|
|    1|     e|
+-----+------+
like image 157
Mariusz Avatar answered Oct 01 '22 02:10

Mariusz


In Spark v. 2.1+, there is pyspark.sql.functions.posexplode() which will explode the array and provide the index:

Using the same example as @Mariusz:

df.show()
#+---------+
#|    array|
#+---------+
#|[a, b, c]|
#|   [d, e]|
#+---------+

df.select(f.posexplode('array')).show()
#+---+---+
#|pos|col|
#+---+---+
#|  0|  a|
#|  1|  b|
#|  2|  c|
#|  0|  d|
#|  1|  e|
#+---+---+
like image 36
pault Avatar answered Oct 01 '22 02:10

pault