Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pandas_udf error RuntimeError: Result vector from pandas_udf was not the required length: expected 12, got 35

I am getting error with pandas_udf with the following code. The code is to create a column with data type based on another column. The same code works fine for the normal slower udf (commented out).

Basically anything more sophisticated that "string"+data returns an error.

# from pyspark.sql.functions import udf
import pyspark.sql.types
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(returnType=pyspark.sql.types.StringType(), functionType=PandasUDFType.SCALAR)
def my_transform (data) -> bytes:
    return_val = str(type(data))
    return return_val

rawdata_df = process_fails.toDF()

# decode_df = rawdata_df.withColumn('new_col', udf_decode(udf_unzip(udf_b64decode(udf_bytes(rawdata_df.rawData)))))
decode_df = rawdata_df.withColumn('new_col', my_transform(rawdata_df.rawData))

decode_df.show()

I get the following error:

An error occurred while calling o887.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 70, ip-10-213-56-185.ap-southeast-2.compute.internal, executor 10): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/mnt/yarn/usercache/livy/appcache/application_1574912148721_0001/container_1574912148721_0001_01_000020/pyspark.zip/pyspark/worker.py", line 98, in verify_result_length
    "expected %d, got %d" % (len(a[0]), len(result)))
RuntimeError: Result vector from pandas_udf was not the required length: expected 12, got 35


This also gives an error:

import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
df = pd.DataFrame({'x': ["1","2","3"], 'y':[1.0,2.0,3.0]})
sp_df = spark.createDataFrame(df)

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return len(v)

sp_df.withColumn('v2', pandas_plus_one(sp_df.x)).show()

The error message is:

TypeError: Return type of the user-defined function should be Pandas.Series, but is <class 'int'>
like image 401
Kevin Tianyu Xu Avatar asked Mar 03 '23 03:03

Kevin Tianyu Xu


1 Answers

pandas_udfs of type PandasUDFType.Scalar expect a pd.Series in and a pd.Series returned. That is why the TypeError is returned - the function pandas_plus_one returns an int instead of pd.Series Using your second example of values, the input to the UDF given that dataframe's column x is actually

v = pd.Series(["1", "2", "3"])
print(v)

# 0    1
# 1    2
# 2    3
# dtype: object

If you wanted the length of each item in the series, it would be easiest to map it. The function definition (with type-hints for clarity) should look closer to:

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v: pd.Series) -> pd.Series:
    return v.map(lambda x: len(x))

You can apply the same concept (use a map to make sure your pandas_udf returns a pd.Series of the same length) to your original problem and it should resolve your issue.

like image 147
ayplam Avatar answered Mar 06 '23 12:03

ayplam