Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to connect to remote Spark cluster from python in docker

I installed Spark 2.0.0 and Python 3 in a container with user docker-user. Stand-alone mode appears to be working.

We have set up a Spark cluster on AWS and hadoop. With VPN running, from laptop I can ssh to the "internal IP", like

ssh [email protected]

This logs in. Then

cd /opt/spark/bin
./pyspark

This shows Spark 2.0.0 and Python 2.7.6. A naive parallelize example works.

Now in the Docker-backed Jupyter Notebook, do

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('hello').setMaster('spark://1.1.1.1:7077').setSparkHome('/opt/spark/')
sc = SparkContext(conf=conf)

This apparently goes through to the cluster, because I can see the application "hello" in the Spark dashboard at 1.1.1.1:8080. It puzzles me that it has gone so far from within Docker without care for ssh, password, etc.

Now try a naive parallelize example,

x = ['spark', 'rdd', 'example', 'sample', 'example']
y = sc.parallelize(x)

Looks OK. then,

y.collect()

It hangs there.

On the dashboard "Executor Summary" table, I don't know exactly what to look for. But one worker whose state is exited has stderr like this:

16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for TERM
16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for HUP
16/08/16 17:37:01 INFO SignalUtils: Registered signal handler for INT
16/08/16 17:37:02 INFO SecurityManager: Changing view acls to: ubuntu,docker-user
16/08/16 17:37:02 INFO SecurityManager: Changing modify acls to: ubuntu,docker-user
16/08/16 17:37:02 INFO SecurityManager: Changing view acls groups to: 
16/08/16 17:37:02 INFO SecurityManager: Changing modify acls groups to: 
16/08/16 17:37:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ubuntu, docker-user); groups with view permissions: Set(); users  with modify permissions: Set(ubuntu, docker-user); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:70)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:166)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:262)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Failure.recover(Try.scala:216)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
    ... 8 more
java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set.
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231)
    at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138)
    at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)

Note the Docker user docker-user may be an issue, because there server machine expects ubuntu. There may be further issues.

Could the Python package paramiko help here? I know how to use paramiko to create a client object, through which to issue commands etc as if I am logged into the server. But don't know how to combine that with SparkConf and SparkContext.

Various sources stop at saying SparkConf().setMaster('spark://1.1.1.1:7077') as if it will just work. I believe some hoops are inevitable regarding login, password, ssh, auth.

Thanks!

like image 584
zpz Avatar asked Oct 29 '22 22:10

zpz


1 Answers

spark driver has to be accessible from cluster, make sure you can ping the machine you are running spark driver. this is because executors will have to contact driver actively. they don't keep TCP connection alive(not scalable otherwise).

other method is to use cluster mode other than client mode.

like image 135
linehrr Avatar answered Nov 15 '22 07:11

linehrr