I have two files. functions.py
has a function and creates a pyspark udf from that function. main.py
attempts to import the udf. However, main.py
seems to have trouble accessing the function in functions.py
.
functions.py:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def do_something(x):
return x + 'hello'
sample_udf = udf(lambda x: do_something(x), StringType())
main.py:
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
This results in an error:
17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'do_something'
If I bypass the do_something
function and just put it inside the udf, eg: udf(lambda x: x + ' hello', StringType())
, the UDF imports fine - but my function is a little longer and it would be nice to have it encapsulated in a separate function. What's the correct way to achieve this?
Just adding this as answer:-
add your py file to sparkcontext in order to make it available to your executors.
sc.addPyFile("functions.py")
from functions import sample_udf
Here is my test notebook
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3669221609244155/3140647912908320/868274901052987/latest.html
Thanks, Charles.
I think a cleaner solution would be to use the udf decorator to define your udf function :
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
@F.udf
def sample_udf(x):
return x + 'hello'
With this solution, the udf does not reference any other function and you don't need the sc.addPyFile
in your main code.
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
# It works :)
For some older versions of spark, the decorator doesn't support typed udf some you might have to define a custom decorator as follow :
import pyspark.sql.functions as F
import pyspark.sql.types as t
# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
def _typed_udf_wrapper(func):
return F.udf(func, returntype)
return _typed_udf_wrapper
@udf_typed(t.IntegerType())
def my_udf(x)
return int(x)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With