I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).
I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:
Traceback (most recent call last):
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
options={'disp': False})
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
callback=callback, **options)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
f, g = func_and_grad(x)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
f = fun(x, *args)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
return function(*(wrapper_args + args))
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
yield self._read_with_length(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
data = self._sock.recv(left)
socket.timeout: timed out
I also found python broken pipe
error when I set spark log level to "ALL".
I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?
I found this is related to the optimization routine I used in my program.
What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.
Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.
I had similar problem. I had an iteration, and sometimes execution took so long it timed out. Increasing spark.executor.heartbeatInterval
seemed to solve the problem. I increased it to 3600s to ensure I don't run into timeouts again and everything is working fine since then.
From: http://spark.apache.org/docs/latest/configuration.html :
spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.
I had a similar problem, and for me, this fixed it:
import pyspark as ps
conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")
sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine
More examples of setting other options here: https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With