I just went through a PySpark training course and I'm compiling a script of example lines of code (which explains why the code block does nothing). Every time I run this code, I get this error once or twice. The line which throws it changes between runs. I've tried setting spark.executor.memory
and spark.executor.heartbeatInterval
, but the error persists. I've also tried putting .cache()
at the end of various lines, with no changes.
The error:
16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python
java.net.SocketException: Socket is closed
at java.net.Socket.shutdownOutput(Socket.java:1551)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
The code:
from pyspark import SparkConf, SparkContext
def parseLine(line):
fields = line.split(',')
return (int(fields[0]), float(fields[2]))
def parseGraphs(line):
fields = line.split()
return (fields[0]), [int(n) for n in fields[1:]]
# putting the [*] after local makes it run one executor on each core of your local PC
conf = SparkConf().setMaster("local[*]").setAppName("MyProcessName")
sc = SparkContext(conf = conf)
# parse the raw data and map it to an rdd.
# each item in this rdd is a tuple
# two methods to get the exact same data:
########## All of these methods can use lambda or full methods in the same way ##########
# read in a text file
customerOrdersLines = sc.textFile("file:///SparkCourse/customer-orders.csv")
customerOrdersRdd = customerOrdersLines.map(parseLine)
customerOrdersRdd = customerOrdersLines.map(lambda l: (int(l.split(',')[0]), float(l.split(',')[2])))
print customerOrdersRdd.take(1)
# countByValue groups identical values and counts them
salesByCustomer = customerOrdersRdd.map(lambda sale: sale[0]).countByValue()
print salesByCustomer.items()[0]
# use flatMap to cut everything up by whitespace
bookText = sc.textFile("file:///SparkCourse/Book.txt")
bookRdd = bookText.flatMap(lambda l: l.split())
print bookRdd.take(1)
# create key/value pairs that will allow for more complex uses
names = sc.textFile("file:///SparkCourse/marvel-names.txt")
namesRdd = names.map(lambda line: (int(line.split('\"')[0]), line.split('\"')[1].encode("utf8")))
print namesRdd.take(1)
graphs = sc.textFile("file:///SparkCourse/marvel-graph.txt")
graphsRdd = graphs.map(parseGraphs)
print graphsRdd.take(1)
# this will append "extra text" to each name.
# this is faster than a normal map because it doesn't give you access to the keys
extendedNamesRdd = namesRdd.mapValues(lambda heroName: heroName + "extra text")
print extendedNamesRdd.take(1)
# not the best example because the costars is already a list of integers
# but this should return a list, which will update the values
flattenedCostarsRdd = graphsRdd.flatMapValues(lambda costars: costars)
print flattenedCostarsRdd.take(1)
# put the heroes in ascending index order
sortedHeroes = namesRdd.sortByKey()
print sortedHeroes.take(1)
# to sort heroes by alphabetical order, we switch key/value to value/key, then sort
alphabeticalHeroes = namesRdd.map(lambda (key, value): (value, key)).sortByKey()
print alphabeticalHeroes.take(1)
# make sure that "spider" is in the name of the hero
spiderNames = namesRdd.filter(lambda (id, name): "spider" in name.lower())
print spiderNames.take(1)
# reduce by key keeps the key and performs aggregation methods on the values. in this example, taking the sum
combinedGraphsRdd = flattenedCostarsRdd.reduceByKey(lambda value1, value2: value1 + value2)
print combinedGraphsRdd.take(1)
# broadcast: this is accessible from any executor
sentData = sc.broadcast(["this can be accessed by all executors", "access it using sentData"])
# accumulator: this is synced across all executors
hitCounter = sc.accumulator(0)
DISCLAIMER: I haven't spent enough time on that part of Spark's codebase, but let me give you some hints that may lead to a solution. What follows is just to explain where to search for more information not a solution to the issue.
The exception you are facing is due to some other issue as seen in the code here (as you may see by the line java.net.Socket.shutdownOutput(Socket.java:1551)
which is when worker.shutdownOutput()
is executed).
16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python
java.net.SocketException: Socket is closed
at java.net.Socket.shutdownOutput(Socket.java:1551)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply$mcV$sp(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3$$anonfun$apply$4.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.tryLog(Utils.scala:1870)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:344)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
That leads me to believe that the ERROR is a follow-up to some other earlier error.
The name stdout writer for python is the name of the thread that (uses EvalPythonExec physical operator and) is responsible for the communication between Spark and pyspark (so you can execute python code without much changes).
As a matter of fact the scaladoc of EvalPythonExec
gives quite a lot of information on the underlying communication infrastructure that pyspark uses internally and that uses sockets to an external Python process.
Python evaluation works by sending the necessary (projected) input data via a socket to an external Python process, and combine the result from the Python process with the original row.
Moreover, python
is used by default unless overriden using PYSPARK_DRIVER_PYTHON
or PYSPARK_PYTHON
(as you can see in pyspark
shell script here and here). That's the name that appears in the name of the thread that fails.
16/09/21 10:29:32 ERROR Utils: Uncaught exception in thread stdout writer for python
I'd recommend checking out the version of python on your system using the following command.
python -c 'import sys; print(sys.version_info)'
That should be Python 2.7+, but could be that you use the very latest Python that is not well tested with Spark. Guessing...
You should include the entire log of the pyspark application's execution and that's where I'd expect to find the answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With