I installed Spark 2.0.0 and Python 3 in a container with user docker-user
. Stand-alone mode appears to be working.
We have set up a Spark cluster on AWS and hadoop. With VPN running, from laptop I can ssh to the "internal IP", like
ssh [email protected]
This logs in. Then
cd /opt/spark/bin
./pyspark
This shows Spark 2.0.0 and Python 2.7.6. A naive parallelize
example works.
Now in the Docker-backed Jupyter Notebook, do
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('hello').setMaster('spark://1.1.1.1:7077').setSparkHome('/opt/spark/')
sc = SparkContext(conf=conf)
This apparently goes through to the cluster, because I can see the application "hello" in the Spark dashboard at 1.1.1.1:8080. It puzzles me that it has gone so far from within Docker without care for ssh, password, etc.
Now try a naive parallelize
example,
x = ['spark', 'rdd', 'example', 'sample', 'example']
y = sc.parallelize(x)
Looks OK. then,
y.collect()
It hangs there.
On the dashboard "Executor Summary" table, I don't know exactly what to look for. But one worker whose state is exited
has stderr
like this:
16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for TERM
16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for HUP
16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for INT
16/08/16 17:37:02 INFO SecurityManager: Changing view acls to: ubuntu,docker-user
16/08/16 17:37:02 INFO SecurityManager: Changing modify acls to: ubuntu,docker-user
16/08/16 17:37:02 INFO SecurityManager: Changing view acls groups to:
16/08/16 17:37:02 INFO SecurityManager: Changing modify acls groups to:
16/08/16 17:37:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu, docker-user); groups with view permissions: Set(); users with modify permissions: Set(ubuntu, docker-user); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:70)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:166)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:262)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Failure.recover(Try.scala:216)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
... 8 more
java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
Note the Docker user docker-user
may be an issue, because there server machine expects ubuntu
. There may be further issues.
Could the Python package paramiko
help here? I know how to use paramiko
to create a client object, through which to issue commands etc as if I am logged into the server. But don't know how to combine that with SparkConf
and SparkContext
.
Various sources stop at saying SparkConf().setMaster('spark://1.1.1.1:7077')
as if it will just work. I believe some hoops are inevitable regarding login, password, ssh, auth.
Thanks!
spark driver has to be accessible from cluster, make sure you can ping the machine you are running spark driver. this is because executors will have to contact driver actively. they don't keep TCP connection alive(not scalable otherwise).
other method is to use cluster mode other than client mode.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With