Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark SQL Pandas UDF: Returning an array

I'm trying to make a pandas UDF that takes in two columns with integer values and based on the difference between these values return an array of decimals whose length is equal to the aforementioned difference.

Here's my attempt so far, I've been messing around with a lot of different ways trying to get this to work but here's the general idea

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []
  
  for i in range(0, (x - y)):
    buffer.append(0.0)
  
  return buffer

Here's how I use it:

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

The end result being df with a new column called "zero_list" being an ArrayType(DecimalType()) column that looks like [0.0, 0.0, 0.0, ...] the length of which is (df.x - df.y).

The error message is so general it's almost not worth posting, simply "Job aborted due to stage failure" and it only traces back to the part of my code where I do a df.show():

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

How to create a pandas_udf that would return an array of variable length?

I'm doing all of this using Databricks with Spark 2.3.1.

like image 903
UneagerAngryBeaver Avatar asked Nov 07 '22 02:11

UneagerAngryBeaver


1 Answers

This question is for about one year ago but I ran into the same problem and here is my solution with pandas_udf:

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def zero_pad(xs,ys):
    buffer = []
    for idx, x in enumerate(xs):
        buffer.append([0]*int(x-ys[idx]))

    return pd.Series(buffer)

df = df.withColumn("zero_list", zero_pad(df.x, df.y))
like image 56
NaWeeD Avatar answered Nov 14 '22 22:11

NaWeeD