I have a setup with Jupyter 4.3.0, Python 3.6.3 (Anaconda), and PySpark 2.2.1.
The following example will fail when run through Jupyter:
sc = SparkContext.getOrCreate()
rdd = sc.parallelize(['A','B','C'])
rdd.collect()
Below is the complete stack trace:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-35-0d4a2ca9edf4> in <module>()
2
3 rdd = sc.parallelize(['A','B','C'])
----> 4 rdd.collect()
/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/rdd.py in collect(self)
807 """
808 with SCCallSiteSync(self.context) as css:
--> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810 return list(_load_from_socket(port, self._jrdd_deserializer))
811
/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/Cellar/apache-spark/2.2.1/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:467)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
The same example runs successfully using the pyspark client. It also runs successfully (in either Jupyter or the pyspark client) when using take()
instead of collect()
.
Any ideas of what might be going on? This post suggests it could be some bug in Spark 2.2.1. I'd rather not downgrade to Spark 2.2.0 as suggested if at all possible.
UPDATE: I'm running on macOS High Sierra (10.13.3). Here is the output of sc._conf.getAll()
:
[('spark.sql.catalogImplementation', 'hive'),
('spark.app.id', 'local-1517752276379'),
('spark.rdd.compress', 'True'),
('spark.serializer.objectStreamReset', '100'),
('spark.master', 'local[*]'),
('spark.executor.id', 'driver'),
('spark.submit.deployMode', 'client'),
('spark.driver.port', '55920'),
('spark.app.name', 'pyspark-shell'),
('spark.driver.host', '192.168.1.5')]
And here are some further Jupyter-PySpark integration configuration:
~/.ipython/profile_pyspark/startup/00-pyspark-setup.py
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())
~/Library/Jupyter/kernels/pyspark/kernel.json
{
"display_name": "PySpark (Spark 2.2.1)",
"language": "python",
"argv": [
"/Users/rodrygo/anaconda3/bin/python3",
"-m",
"ipykernel",
"--profile=pyspark",
"-f",
"{connection_file}"
],
"env": {
"CAPTURE_STANDARD_OUT": "true",
"CAPTURE_STANDARD_ERR": "true",
"SEND_EMPTY_OUTPUT": "false",
"SPARK_HOME": "/usr/local/Cellar/apache-spark/2.2.1/libexec/"
}
}
PySpark Collect() – Retrieve data from DataFrame Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
Jupyter also supports Big data tools such as Apache Spark for data analytics needs.
Collect action will try to move all data in RDD/DataFrame to the machine with the driver and where it may run out of memory and crash. Instead, you can make sure that the number of items returned is sampled by calling take or takeSample , or perhaps by filtering your RDD/DataFrame.
For those of you having the same problem, there seems to be an issue with Spark (as of version 2.2.1) and Java 9. I got my example code to work by setting JAVA_HOME back to Java 1.8.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With