I have a module that I've written containing functions that act on PySpark DataFrames. They do a transformation on columns in the DataFrame and then return a new DataFrame. Here is an example of the code, shortened to include only one of the functions:
from pyspark.sql import functions as F
from pyspark.sql import types as t
import pandas as pd
import numpy as np
metadta=pd.DataFrame(pd.read_csv("metadata.csv")) # this contains metadata on my dataset
def str2num(text):
if type(text)==None or text=='' or text=='NULL' or text=='null':
return 0
elif len(text)==1:
return ord(text)
else:
newnum=''
for lettr in text:
newnum=newnum+str(ord(lettr))
return int(newnum)
str2numUDF = F.udf(lambda s: str2num(s), t.IntegerType())
def letConvNum(df): # df is a PySpark DataFrame
#Get a list of columns that I want to transform, using the metadata Pandas DataFrame
chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()
for curcol in chng_cols:
df=df.withColumn(curcol, str2numUDF(df[curcol]))
return df
So that is my module, call it mymodule.py. If I start the PySpark shell, and I do the following:
import mymodule as mm
myf=sqlContext.sql("select * from tablename lim 10")
I check myf (PySpark DataFrame) and it is ok. I check that I have actually imported mymodule, by trying to use the str2num function:
mm.str2num('a')
97
So it actually is importing the module. Then if I try this:
df2=mm.letConvNum(df)
And do this to check that it worked:
df2.show()
It tries to perform the action, but then it crashes:
16/03/10 16:10:44 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 365)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "test2.py", line 16, in <module>
str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
return UserDefinedFunction(f, returnType)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
self._judf = self._create_judf(name)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
[x._jbroadcast for x in sc._pickled_broadcast_vars],
AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/10 16:10:44 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/dataframe.py", line 256, in show
print(self._jdf.showString(n, truncate))
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o7299.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 365, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "test2.py", line 16, in <module>
str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
return UserDefinedFunction(f, returnType)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
self._judf = self._create_judf(name)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
[x._jbroadcast for x in sc._pickled_broadcast_vars],
AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "test2.py", line 16, in <module>
str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
return UserDefinedFunction(f, returnType)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
self._judf = self._create_judf(name)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
[x._jbroadcast for x in sc._pickled_broadcast_vars],
AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
As a check, I opened a clean shell and instead of importing the module, I just defined the str2num function and the UDF in the interactive shell. I then typed in the contents of the last function, and did the same final check:
df2.show()
This time, I get back the transformed DataFrame I was expecting.
Why does it work when the functions are inputted interactively but not when they are read in from a module? I know it is reading the module, as the regular function str2num works.
In spark you can select columns by giving their names as strings (but only if they are unique in current dataframe). You have to pass columns like df. select("col1", "col2") rather than df. select(["col1", "col2"]) .
By setting inferSchema=true , Spark will automatically go through the csv file and infer the schema of each column. This requires an extra pass over the file which will result in reading a file with inferSchema set to true being slower. But in return the dataframe will most likely have a correct schema given its input.
Spark DataFrame printSchema() To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
I had the same error and followed the stack trace.
In my case, I was building an Egg file and then passing it to spark via the --py-files
option.
Concerning the error, I think it boils down to the fact that when you call F.udf(str2num, t.IntegerType())
a UserDefinedFunction
instance is created before Spark is running, so it has an empty reference to some SparkContext
, call it sc
. When you run the UDF, sc._pickled_broadcast_vars
is referenced and this throws the AttributeError
in your output.
My work around is to avoid creating the UDF until Spark is running (and hence there is an active SparkContext
. In your case, you could just change your definition of
def letConvNum(df): # df is a PySpark DataFrame
#Get a list of columns that I want to transform, using the metadata Pandas DataFrame
chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()
str2numUDF = F.udf(str2num, t.IntegerType()) # create UDF on demand
for curcol in chng_cols:
df=df.withColumn(curcol, str2numUDF(df[curcol]))
return df
Note: I haven't actually tested the code above, but the change in my own code was similar and everything worked fine.
Also, for the interested reader, see the Spark code for UserDefinedFunction
I think a cleaner solution would be to use the udf decorator to define your udf function :
from pyspark.sql.functions as F
@F.udf
def str2numUDF(text):
if type(text)==None or text=='' or text=='NULL' or text=='null':
return 0
elif len(text)==1:
return ord(text)
else:
newnum=''
for lettr in text:
newnum=newnum+str(ord(lettr))
return int(newnum)
With this solution, the udf does not reference any other function so it won't throw any errors at you.
For some older versions of spark, the decorator doesn't support typed udf some you might have to define a custom decorator as follow :
from pyspark.sql.functions as F
from pyspark.sql.types as t
# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
def _typed_udf_wrapper(func):
return F.udf(func, returntype)
return _typed_udf_wrapper
@udf_typed(t.IntegerType())
def my_udf(x)
return int(x)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With