Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it necessary to submit spark application jar?

As stated in title I'm wondering if is it necessary to spark-submit *.jar?

I'm using Datastax Enterprise Cassandra for a while, but now I need to use Spark too. I watched almost all videos from DS320: DataStax Enterprise Analytics with Apache Spark and there is nothing about connecting to spark remotely from java application.

Now I have 3 running nodes of DSE. I can connect to Spark from spark shell. But after 2 days of trying to connect to Spark from java code I'm giving up.

This is my Java code

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("AppName");
//sparkConf.set("spark.shuffle.blockTransferService", "nio");
//sparkConf.set("spark.driver.host", "*.*.*.*");
//sparkConf.set("spark.driver.port", "7007");
sparkConf.setMaster("spark://*.*.*.*:7077");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

Result of connecting

16/01/18 14:32:43 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:32:43 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:03 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:33:03 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
16/01/18 14:33:23 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet.
16/01/18 14:33:23 WARN AppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
16/01/18 14:33:23 ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread
org.apache.spark.SparkException: Error communicating with MapOutputTracker
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:114)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    ... 18 more
16/01/18 14:33:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.
    at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:438)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:124)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I tried to change SPARK_MASTER_IP, SPARK_LOCAL_IP and many others config variables, but without success. Now I found some articles about submiting jars to Spark and I'm not sure (can't find any proof) if it is the cause? Are spark-submit and interactive shell the only ways to use spark?

Any articles about it? I would be grateful if you could give me a tip.

like image 261
Marcin Lagowski Avatar asked Jan 19 '16 12:01

Marcin Lagowski


People also ask

What happens when we submit a spark application?

Once you do a Spark submit, a driver program is launched and this requests for resources to the cluster manager and at the same time the main program of the user function of the user processing program is initiated by the driver program.

How do I submit a spark application?

You can submit a Spark batch application by using cluster mode (default) or client mode either inside the cluster or from an external client: Cluster mode (default): Submitting Spark batch application and having the driver run on a host in your driver resource group. The spark-submit syntax is --deploy-mode cluster.


1 Answers

I would greatly recommend using dse spark-submit with dse. While it is not required it is definitely much easier than ensuring that your security and class path options as set up for DSE will work with your cluster. It also provides a much simpler approach (in my opinion) for configuring your SparkConf and placing jars on the executor class-paths.

Within DSE it also will automatically route your application to the correct Spark master url, further simplifying setup.

If you really want to manually construct your SparkConf be sure to map your spark master to the output of dsetool spark-master or it's equivalent in your version of DSE.

like image 158
RussS Avatar answered Oct 28 '22 23:10

RussS