I'm trying to make a pandas UDF that takes in two columns with integer values and based on the difference between these values return an array of decimals whose length is equal to the aforementioned difference.
Here's my attempt so far, I've been messing around with a lot of different ways trying to get this to work but here's the general idea
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
buffer = []
for i in range(0, (x - y)):
buffer.append(0.0)
return buffer
Here's how I use it:
df = df.withColumn("zero_list", zero_pad(df.x, df.y))
The end result being df
with a new column called "zero_list" being an ArrayType(DecimalType())
column that looks like [0.0, 0.0, 0.0, ...]
the length of which is (df.x - df.y)
.
The error message is so general it's almost not worth posting, simply "Job aborted due to stage failure" and it only traces back to the part of my code where I do a df.show()
:
Py4JJavaError Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)
/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
350 """
351 if isinstance(truncate, bool) and truncate:
--> 352 print(self._jdf.showString(n, 20, vertical))
353 else:
354 print(self._jdf.showString(n, int(truncate), vertical))
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
How to create a pandas_udf
that would return an array of variable length?
I'm doing all of this using Databricks with Spark 2.3.1.
This question is for about one year ago but I ran into the same problem and here is my solution with pandas_udf
:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def zero_pad(xs,ys):
buffer = []
for idx, x in enumerate(xs):
buffer.append([0]*int(x-ys[idx]))
return pd.Series(buffer)
df = df.withColumn("zero_list", zero_pad(df.x, df.y))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With