I wrote a pyspark script that reads two json files, coGroup
them and sends the result to an elasticsearch cluster; everything works (mostly) as expected when I run it locally, I downloaded the elasticsearch-hadoop
jar file for the org.elasticsearch.hadoop.mr.EsOutputFormat
and org.elasticsearch.hadoop.mr.LinkedMapWritable
classes, and then run my job with pyspark using the --jars
argument, and I can see documents appearing in my elasticsearch cluster.
When I try to run it on a spark cluster, however, I'm getting this error:
Traceback (most recent call last):
File "/root/spark/spark_test.py", line 141, in <module>
conf=es_write_conf
File "/root/spark/python/pyspark/rdd.py", line 1302, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.util.Utils$.classForName(Utils.scala:157)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:611)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:610)
at scala.Option.map(Option.scala:145)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:610)
at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:609)
at scala.Option.flatMap(Option.scala:170)
at org.apache.spark.api.python.PythonRDD$.getKeyValueTypes(PythonRDD.scala:609)
at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:701)
at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
which seems pretty clear to me: the elasticsearch-hadoop
jar is not available on the workers; so the question: how do I send it along with my app? I could use sc.addPyFile
for a python dependency, but it won't work with jars, and using the --jars
parameters of spark-submit
doesn't help.
The --jars
just works; the problem is how I run the spark-submit
job in the first place; the correct way to execute is:
./bin/spark-submit <options> scriptname
Therefore the --jars
option must be placed before the script:
./bin/spark-submit --jars /path/to/my.jar myscript.py
This if obvious if you think that this is the only way to pass arguments to the script itself, as everything after the script name will be used as input arguments for the script:
./bin/spark-submit --jars /path/to/my.jar myscript.py --do-magic=true
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With