The current directory structure for my test script is as follows:
project/
script/
__init__.py
map.py
test/
__init.py__
test_map.py
My map.py is defined as follows:
def add(x,y):
return x+y
def map_add(df):
result = df.map(lambda x: (x.key, x.value)).reduceByKey(add)
return result
The test_map.py looks like this:
def add_pyspark_path():
"""
Add PySpark to the PYTHONPATH
"""
import sys
import os
try:
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
# Spark 1.6
sys.path.append(os.path.join(os.environ['SPARK_HOME'],
"python", "lib", "py4j-0.9-src.zip"))
except KeyError:
print("SPARK_HOME not set")
sys.exit(1)
# To import pyspark
add_pyspark_path()
import unittest
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from script.map import map_add
class PySparkTestCase(unittest.TestCase):
def setUp(self):
# Setup a new spark context for each test
conf = SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "1")
conf.set("spark.app.name", "nosetest")
self.sc = SparkContext(conf=conf)
def tearDown(self):
self.sc.stop()
# This would go in tests/project_test.py
class MapTests(PySparkTestCase):
def MockDataFrame(self):
# Get a mock dataframe to test the script
sqlContext = SQLContext(self.sc)
rdd = self.sc.parallelize([(1,0), (1,1), (2,0), (2,2)])
schema = [
"key",
"value"
]
dataset = sqlContext.createDataFrame(rdd, schema)
return dataset
def test_add(self):
df = self.MockDataFrame()
result = map_add(df)
print(result.collect())
self.assertEqual(result.count(), 2)
When I run the nosetest in the test directory, the test fails. I get no module named "script" found. However when I modify the map_add function to replace the call to add within reduceByKey in map.py like this:
def map_add(df):
result = df.map(lambda x: (x.key, x.value)).reduceByKey(lambda x,y: x+y)
return result
The test passes.
Also, when I run the original test_map.py from the project directory, the test passes.
I am not able to figure out why the test doesn't detect the script module when it is within the test directory.
Here is the log snippet for the error:
test_add (test.test_map.MapTests) ... 16/01/11 15:38:10 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 5)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/01/11 15:38:10 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
ERROR
======================================================================
ERROR: test_add (test.test_map.MapTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/datitran/Desktop/pyspark_test/test/test_map.py", line 56, in test_add
print(result.collect())
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/pyspark/rdd.py", line 771, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
nose.proxy.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 5, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 419, in loads
return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'script'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Any help will be appreciated.
found an answer to this problem and thought would be a good idea to share this to the community. This might be useful if the codebase is split into different folders. So, somehow, relative importance does not work with pyspark as the import does not happen at the workers but I found a way to solve this.
First, create another _ init _.py file in the main directory, in this cases called "project", so the structure would look like this:
project/
__init__.py
script/
__init__.py
map.py
test/
__init.py__
test_map.py
Second, add the PYTHONPATH explicity to the spark environment, so that every workers know where to search for the import:
class PySparkTestCase(unittest.TestCase):
def setUp(self):
conf = SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "1")
conf.set("spark.app.name", "nosetest")
conf.setExecutorEnv("PYTHONPATH", "$PYTHONPATH:" +
os.path.abspath(os.path.join(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir), os.pardir)))
self.sc = SparkContext(conf=conf)
Finally, we need to import the module on top of the python script:
from project.script.map import map_add
Then executing the spark job works now!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With