Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IllegalArgumentException with Spark collect() on Jupyter

I have a setup with Jupyter 4.3.0, Python 3.6.3 (Anaconda), and PySpark 2.2.1.

The following example will fail when run through Jupyter:

sc = SparkContext.getOrCreate()

rdd = sc.parallelize(['A','B','C'])
rdd.collect()

Below is the complete stack trace:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-35-0d4a2ca9edf4> in <module>()
      2 
      3 rdd = sc.parallelize(['A','B','C'])
----> 4 rdd.collect()

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

The same example runs successfully using the pyspark client. It also runs successfully (in either Jupyter or the pyspark client) when using take() instead of collect().

Any ideas of what might be going on? This post suggests it could be some bug in Spark 2.2.1. I'd rather not downgrade to Spark 2.2.0 as suggested if at all possible.

UPDATE: I'm running on macOS High Sierra (10.13.3). Here is the output of sc._conf.getAll():

[('spark.sql.catalogImplementation', 'hive'),
 ('spark.app.id', 'local-1517752276379'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '55920'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.host', '192.168.1.5')]

And here are some further Jupyter-PySpark integration configuration:

~/.ipython/profile_pyspark/startup/00-pyspark-setup.py

import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

~/Library/Jupyter/kernels/pyspark/kernel.json

{
    "display_name": "PySpark (Spark 2.2.1)",
    "language": "python",
    "argv": [
        "/Users/rodrygo/anaconda3/bin/python3",
        "-m",
        "ipykernel",
        "--profile=pyspark",
        "-f",
        "{connection_file}"
    ],
    "env": {
        "CAPTURE_STANDARD_OUT": "true",
        "CAPTURE_STANDARD_ERR": "true",
        "SEND_EMPTY_OUTPUT": "false",
        "SPARK_HOME": "/usr/local/Cellar/apache-spark/2.2.1/libexec/"
    }
}
like image 510
Rodrygo Santos Avatar asked Feb 03 '18 23:02

Rodrygo Santos


People also ask

What does Spark collect () do?

PySpark Collect() – Retrieve data from DataFrame Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

Can I use Spark in Jupyter notebook?

Jupyter also supports Big data tools such as Apache Spark for data analytics needs.

What can I use instead of Spark collect?

Collect action will try to move all data in RDD/DataFrame to the machine with the driver and where it may run out of memory and crash. Instead, you can make sure that the number of items returned is sampled by calling take or takeSample , or perhaps by filtering your RDD/DataFrame.


1 Answers

For those of you having the same problem, there seems to be an issue with Spark (as of version 2.2.1) and Java 9. I got my example code to work by setting JAVA_HOME back to Java 1.8.

like image 188
Rodrygo Santos Avatar answered Sep 28 '22 00:09

Rodrygo Santos