Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark - Are Spark DataFrame Arrays Different Than Python Lists?

If I have a Spark DataFrame containing arrays, can I use Python List methods on these arrays through a UDF? How can I take the Spark DataFrame array<double> and turn it into a Python list?

Below is an example, with a few UDFs. I am not sure why taking the max works, but taking len does not. Ultimately, I want to make a new column with sampled values from the original array column. That also gets an error about expecting two arguments, bonus points if you can help with that too!

I have the following Spark DataFrame:

from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random

df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name|         scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary|          [3.0]|
|Mary|     [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
    tempList = array()
    count=0
    while (count<samples):
        tempList.append(random.sample(listIn,1)[0])
        count=count+1
    return tempList

def maxArray(listIn):
    return max(listIn)

def lenArray(listIn):
    return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())

>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|  null|
|Mary|          [3.0]|  null|
|Mary|     [4.0, 7.1]|  null|
+----+---------------+------+

>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|     3|
|Mary|          [3.0]|     1|
|Mary|     [4.0, 7.1]|     2|
+----+---------------+------+
like image 995
Max Avatar asked Dec 24 '22 00:12

Max


1 Answers

TL;DR When you have a choice always prefer built-in functions over udf. To compute length use size (aliased as length) method:

from pyspark.sql.functions import length, size 

df.withColumn("len", size("scores"))

For small arrays you can try

from pyspark.sql.functions import sort_array

df.withColumn("max", sort_array("scores", False)[0])

but of course it is not a good choice for large collections.

Are Spark DataFrame Arrays Different Than Python Lists?

Internally they are different because there are Scala objects. When accessed in udf there are plain Python lists. So what is going wrong?

Let's look at the types. scores column is array<double>. When converted to Python types this results in a List[float]. When you call max you get a float on the output.

Yet you declare return types as IntegerType. Because float cannot be converted to integer loss of precision result is undefined an you get NULL. Correct choice of the return type is either DoubleType or FloatType:

maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())

(sc
    .parallelize([("Joe", [1.0, 2.0, 3.0])])
    .toDF(["name", "scores"])
    .select("*", maxf("scores"), maxd("scores")))

with result:

+----+---------------+----------------+----------------+
|name|         scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]|             3.0|             3.0|
+----+---------------+----------------+----------------+

and schema:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- <lambda>(scores): float (nullable = true)
 |-- <lambda>(scores): double (nullable = true)
like image 196
zero323 Avatar answered Dec 26 '22 19:12

zero323