Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ModuleNotFoundError in PySpark Worker on rdd.collect()

I am running an Apache Spark program in python, and I am getting an error that I can't understand and can't begin to debug. I have a driver program that defines a function called hound in a file called hound.py. In the same directory, I have a file called hound_base.py that defines a function called hound_base_func. So to call this in hound, I import "from hound_base import hound_base_func." This works, and I call the function and pass a Spark dataframe. hound_base_func takes it as a parameter, does some work on its underlying rdd, and calls rdd.collect(). This actually crashes the code, with the error message "ModuleNotFoundError: No module named 'hound_base'" and this makes no sense! It is saying that the module the code is literally executing in can't be found. Willing to provide as many more details as I can, but this is all I know that relates to the problem... Are there any tips about how I can figure this out?

Full trace

2018-06-14 14:29:26 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2018-06-14 14:29:26 WARN  TaskSetManager:66 - Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

2018-06-14 14:29:26 ERROR TaskSetManager:70 - Task 0 in stage 2.0 failed 1 times; aborting job
[Stage 2:>                                                          (0 + 1) / 1]Traceback (most recent call last):
  File "F:\data\src\hound.py", line 43, in <module>
    hound("fakedata.csv", "Field1", "Field2", "Field3", ["Field4a", "Field4b"])
  File "F:\data\src\hound.py", line 37, in hound
    hound_base_func(data)
  File "F:\data\src\hound_base.py", line 220, in hound_base_func
    rdd_collected = rdd_result.collect()
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\rdd.py", line 824, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    at java.lang.Thread.run(Unknown Source)


Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)

    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)

    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)

    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

    at java.lang.reflect.Method.invoke(Unknown Source)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:214)

    at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    ... 1 more


SUCCESS: The process with PID 18960 (child process of PID 6380) has been terminated.
SUCCESS: The process with PID 6380 (child process of PID 1400) has been terminated.
SUCCESS: The process with PID 1400 (child process of PID 19344) has been terminated.
[Finished in 21.811s]

like image 211
Brian Nieves Avatar asked Jun 14 '18 20:06

Brian Nieves


1 Answers

Multiple problems here:

First off, you're not allowed to access the spark context from executor tasks, i.e. from any functions inside rdd.map().

Second, using outside functions inside the lambda function of .map is tricky. One solution is to move all function definitions inside the original function if possible. If any are in a different file, you have to explicity add that file using spark_context.addPyFile(path) as importing inside the driver isn't enough.

These things fixed the (many) problem's I've had with this error. Note that it only gets thrown on .collect() because of lazy evaluation. Not fun.

like image 87
Brian Nieves Avatar answered Oct 31 '22 16:10

Brian Nieves