Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Import error during unit test while calling a function from reduceByKey()

The current directory structure for my test script is as follows:

project/
    script/
        __init__.py 
        map.py
    test/
        __init.py__
        test_map.py

My map.py is defined as follows:

def add(x,y):
    return x+y

def map_add(df):
    result = df.map(lambda x: (x.key, x.value)).reduceByKey(add)
    return result

The test_map.py looks like this:

def add_pyspark_path():
    """
    Add PySpark to the PYTHONPATH
    """
    import sys
    import os
    try:
        sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
        # Spark 1.6
        sys.path.append(os.path.join(os.environ['SPARK_HOME'],
                                     "python", "lib", "py4j-0.9-src.zip"))
    except KeyError:
        print("SPARK_HOME not set")
        sys.exit(1)

# To import pyspark
add_pyspark_path()

import unittest

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from script.map import map_add


class PySparkTestCase(unittest.TestCase):
    def setUp(self):
        # Setup a new spark context for each test
        conf = SparkConf()
        conf.set("spark.executor.memory", "1g")
        conf.set("spark.cores.max", "1")
        conf.set("spark.app.name", "nosetest")
        self.sc = SparkContext(conf=conf)

    def tearDown(self):
        self.sc.stop()


# This would go in tests/project_test.py
class MapTests(PySparkTestCase):

    def MockDataFrame(self):
        # Get a mock dataframe to test the script
        sqlContext = SQLContext(self.sc)
        rdd = self.sc.parallelize([(1,0), (1,1), (2,0), (2,2)])
        schema = [
            "key",
            "value"
        ]
        dataset = sqlContext.createDataFrame(rdd, schema)
        return dataset

    def test_add(self):
        df = self.MockDataFrame()
        result = map_add(df)
        print(result.collect())
        self.assertEqual(result.count(), 2)

When I run the nosetest in the test directory, the test fails. I get no module named "script" found. However when I modify the map_add function to replace the call to add within reduceByKey in map.py like this:

def map_add(df):
    result = df.map(lambda x: (x.key, x.value)).reduceByKey(lambda x,y: x+y)
    return result

The test passes.

Also, when I run the original test_map.py from the project directory, the test passes.

I am not able to figure out why the test doesn't detect the script module when it is within the test directory.

Here is the log snippet for the error:

test_add (test.test_map.MapTests) ... 16/01/11 15:38:10 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/01/11 15:38:10 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
ERROR

======================================================================
ERROR: test_add (test.test_map.MapTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/datitran/Desktop/pyspark_test/test/test_map.py", line 56, in test_add
    print(result.collect())
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/pyspark/rdd.py", line 771, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
nose.proxy.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 5, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more 

Any help will be appreciated.

like image 568
Dat Tran Avatar asked Jan 11 '16 14:01

Dat Tran


1 Answers

found an answer to this problem and thought would be a good idea to share this to the community. This might be useful if the codebase is split into different folders. So, somehow, relative importance does not work with pyspark as the import does not happen at the workers but I found a way to solve this.

First, create another _ init _.py file in the main directory, in this cases called "project", so the structure would look like this:

project/
    __init__.py
    script/
        __init__.py 
        map.py
    test/
        __init.py__
        test_map.py

Second, add the PYTHONPATH explicity to the spark environment, so that every workers know where to search for the import:

class PySparkTestCase(unittest.TestCase):
    def setUp(self):
        conf = SparkConf()
        conf.set("spark.executor.memory", "1g")
        conf.set("spark.cores.max", "1")
        conf.set("spark.app.name", "nosetest")
        conf.setExecutorEnv("PYTHONPATH", "$PYTHONPATH:" +
                            os.path.abspath(os.path.join(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir), os.pardir)))
        self.sc = SparkContext(conf=conf)

Finally, we need to import the module on top of the python script:

from project.script.map import map_add

Then executing the spark job works now!

like image 156
Dat Tran Avatar answered Nov 11 '22 14:11

Dat Tran