Running PySpark through IPython notebook on EMR (Hadoop 2.4.0) with Spark (1.4.0) in YARN mode using:
IPYTHON_OPTS="notebook --no-browser" nohup /usr/lib/spark/bin/pyspark --master yarn-client --num-executors 2 --executor-memory 512m --executor-cores 1 > /mnt/var/log/python_notebook.log 2> /mnt/var/log/python_notebook_err.log &
Have placed a simple CSV file in HDFS, and trying to read it in using
sc.textFile('/tmp/text.csv').first()
However, this gives me Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
.
In context:
Py4JJavaError Traceback (most recent call last)
<ipython-input-54-e39168c6841b> in <module>()
----> 1 sc.textFile('/tmp/text.csv').first()
/usr/lib/spark/python/pyspark/rdd.py in first(self)
1293 ValueError: RDD is empty
1294 """
-> 1295 rs = self.take(1)
1296 if rs:
1297 return rs[0]
/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
1245 """
1246 items = []
-> 1247 totalParts = self.getNumPartitions()
1248 partsScanned = 0
1249
/usr/lib/spark/python/pyspark/rdd.py in getNumPartitions(self)
353 2
354 """
--> 355 return self._jrdd.partitions().size()
356
357 def filter(self, f):
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o159.partitions.
: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:65)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:47)
at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
... 25 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
... 29 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
... 31 more
I have tried to follow the instructions here and done:
os.environ['SPARK_LIBRARY_PATH'] = "/usr/lib/hadoop-lzo/lib/native/"
os.environ['SPARK_CLASSPATH'] = "/usr/lib/hadoop-lzo/lib/"
However, that does not seem to help.
I know this question is old but I was dealing with this for the past week so I figured I would post our solution incase others come across this. The setup we have is one EC2 instance running as the driver outside of EMR which then can create EMR clusters and communicate with the master. The cluster is running Spark 2.2.0 and the EMR release is 5.9.0.
The solution was to clone the Twitter Hadoop-Lzo Github repo on the Spark driver and then add the path to the hadoop-lzo.jar to spark submit args. SUBMIT_ARGS='--jars /opt/hadoop-lzo/target/hadoop-lzo-0.4.21-SNAPSHOT.jar
. And just replace the path to the .jar with the one wherever you cloned the repo to.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With