I use this as document suggests http://spark.apache.org/docs/1.1.1/submitting-applications.html
spsark version 1.1.0
./spark/bin/spark-submit --py-files /home/hadoop/loganalysis/parser-src.zip \
/home/hadoop/loganalysis/ship-test.py
and conf in code :
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("LogAnalysis")
.set("spark.executor.memory", "1g")
.set("spark.executor.cores", "4")
.set("spark.executor.num", "2")
.set("spark.driver.memory", "4g")
.set("spark.kryoserializer.buffer.mb", "128"))
and slave node complain ImportError
14/12/25 05:09:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
return self.loads(obj)
ImportError: No module named parser
and parser-src.zip is tested locally.
[hadoop@ip-172-31-10-231 ~]$ python
Python 2.7.8 (default, Nov 3 2014, 10:17:30)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.path.insert(1, '/home/hadoop/loganalysis/parser-src.zip')
>>> from parser import parser
>>> parser.parse
<function parse at 0x7fa5ef4c9848>
>>>
I'm trying to get info about the remote worker. see whether it copied the files.what the sys.path looks like..and it's tricky.
UPDATE: I use this found that the zip file was shiped. and sys.path was set. still import get error.
data = list(range(4))
disdata = sc.parallelize(data)
result = disdata.map(lambda x: "sys.path: {0}\nDIR: {1} \n FILES: {2} \n parser: {3}".format(sys.path, os.getcwd(), os.listdir('.'), str(parser)))
result.collect()
print(result.take(4))
it seems I have to digging into cloudpickle.which means I need to understand how cloudpickle works and fails first.
: An error occurred while calling o40.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 23, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length
return self.loads(obj)
File "/home/hadoop/spark/python/pyspark/cloudpickle.py", line 811, in subimport
__import__(name)
ImportError: ('No module named parser', <function subimport at 0x7f219ffad7d0>, ('parser.parser',))
UPDATE:
someone encounter the same problem in spark 0.8 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Importing-other-py-files-in-PYTHONPATH-td2301.html
but he put his lib in python dist-packages and import works. which I tried and still get import error.
UPDATE:
OH.gush.. I think the problem is caused by not understanding zip file and python import behaviour..I pass parser.py to --py-files, it works, complain about another dependency. and zip only the .py files[not including .pyc] seems to work too.
But I couldn't quite understand why though.
Spark environment provides a command to execute the application file, be it in Scala or Java(need a Jar format), Python and R programming file. The command is, $ spark-submit --master <url> <SCRIPTNAME>. py .
Spark Submit Python File Apache Spark binary comes with spark-submit.sh script file for Linux, Mac, and spark-submit. cmd command file for windows, these scripts are available at $SPARK_HOME/bin directory which is used to submit the PySpark file with . py extension (Spark with python) to the cluster.
Try this function of SparkContext
sc.addPyFile(path)
According to pyspark
documentation here
Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future. The path passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.
Try upload your python module file to a public cloud storage (e.g. AWS S3) and pass the URL to that method.
Here is a more comprehensive reading material: http://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html
Try to import your custom module from inside the method itself rather than at the top of the driver script, e.g.:
def parse_record(record):
import parser
p = parser.parse(record)
return p
rather than
import parser
def parse_record(record):
p = parser.parse(record)
return p
Cloud Pickle doesn't seem to recognise when a custom module has been imported, so it seems to try to pickle the top-level modules along with the other data that's needed to run the method. In my experience, this means that top-level modules appear to exist, but they lack usable members, and nested modules can't be used as expected. Once either importing with from A import *
or from inside the method (import A.B
), the modules worked as expected.
It sounds like one or more of the nodes aren't configured properly. Do all of the nodes on the cluster have the same version/configuration of Python (i.e. they all have the parser module installed)?
If you don't want to check one-by-one you could write a script to check if it is installed/install it for you. This thread shows a few ways to do that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With