If I have a Spark DataFrame
containing arrays
, can I use Python List methods on these arrays through a UDF? How can I take the Spark DataFrame
array<double>
and turn it into a Python list?
Below is an example, with a few UDFs. I am not sure why taking the max works, but taking len
does not. Ultimately, I want to make a new column with sampled values from the original array column. That also gets an error about expecting two arguments, bonus points if you can help with that too!
I have the following Spark DataFrame
:
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random
df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name| scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary| [3.0]|
|Mary| [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
tempList = array()
count=0
while (count<samples):
tempList.append(random.sample(listIn,1)[0])
count=count+1
return tempList
def maxArray(listIn):
return max(listIn)
def lenArray(listIn):
return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())
>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| null|
|Mary| [3.0]| null|
|Mary| [4.0, 7.1]| null|
+----+---------------+------+
>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| 3|
|Mary| [3.0]| 1|
|Mary| [4.0, 7.1]| 2|
+----+---------------+------+
TL;DR When you have a choice always prefer built-in functions over udf
. To compute length use size
(aliased as length)
method:
from pyspark.sql.functions import length, size
df.withColumn("len", size("scores"))
For small arrays you can try
from pyspark.sql.functions import sort_array
df.withColumn("max", sort_array("scores", False)[0])
but of course it is not a good choice for large collections.
Are Spark DataFrame Arrays Different Than Python Lists?
Internally they are different because there are Scala objects. When accessed in udf
there are plain Python lists. So what is going wrong?
Let's look at the types. scores
column is array<double>
. When converted to Python types this results in a List[float]
. When you call max
you get a float
on the output.
Yet you declare return types as IntegerType
. Because float
cannot be converted to integer loss of precision result is undefined an you get NULL
. Correct choice of the return type is either DoubleType
or FloatType
:
maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())
(sc
.parallelize([("Joe", [1.0, 2.0, 3.0])])
.toDF(["name", "scores"])
.select("*", maxf("scores"), maxd("scores")))
with result:
+----+---------------+----------------+----------------+
|name| scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]| 3.0| 3.0|
+----+---------------+----------------+----------------+
and schema:
root
|-- name: string (nullable = true)
|-- scores: array (nullable = true)
| |-- element: double (containsNull = true)
|-- <lambda>(scores): float (nullable = true)
|-- <lambda>(scores): double (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With