I'm new to Apache Spark and trying to run a simple program on my cluster. The problem is that the driver allocates all tasks to one worker.
I am running as spark stand-alone cluster mode on 2 computers:
1 - runs the master and a worker with 4 cores: 1 used for the master, 3 for the worker. Ip: 192.168.1.101
2 - runs only a worker with 4 cores: all for worker. Ip: 192.168.1.104
this is the code:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("spark-project");
JavaSparkContext sc = new JavaSparkContext(conf);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
JavaRDD<String> lines = sc.textFile("/Datasets/somefile.txt",7);
System.out.println(lines.partitions().size());
Accumulator<Integer> sum = sc.accumulator(0);
JavaRDD<Integer> numbers = lines.map(line -> 1);
System.out.println(numbers.partitions().size());
numbers.foreach(num -> System.out.println(num));
numbers.foreach(num -> sum.add(num));
System.out.println(sum.value());
sc.close();
}
Note: used Thread.sleep() command because I tried this: https://issues.apache.org/jira/browse/SPARK-3100
I used the submit script:
bin/spark-submit --class spark.Main --master spark://192.168.1.101:7077 --deploy-mode cluster /home/sparkUser/JarsOfSpark/JarForSpark.jar
this is the result I have got from the driver stdout:
7
7
50144
logs from the master:
log4j:WARN No appenders could be found for logger(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/15 19:22:14 INFO SecurityManager: Changing view acls to: sparkUser
16/01/15 19:22:14 INFO SecurityManager: Changing modify acls to: sparkUser
16/01/15 19:22:14 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkUser); users with modify permissions: Set(sparkUser)
16/01/15 19:22:24 INFO Slf4jLogger: Slf4jLogger started
16/01/15 19:22:24 INFO Utils: Successfully started service 'Driver' on port 46546.
16/01/15 19:22:24 INFO WorkerWatcher: Connecting to worker akka.tcp://[email protected]:43150/user/Worker
16/01/15 19:22:24 INFO SparkContext: Running Spark version 1.4.1
16/01/15 19:22:24 INFO SecurityManager: Changing view acls to: sparkUser
16/01/15 19:22:24 INFO SecurityManager: Changing modify acls to: sparkUser
16/01/15 19:22:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkUser); users with modify permissions: Set(sparkUser)
16/01/15 19:22:24 INFO WorkerWatcher: Successfully connected to akka.tcp://[email protected]:43150/user/Worker
16/01/15 19:22:25 INFO Slf4jLogger: Slf4jLogger started
16/01/15 19:22:25 INFO Utils: Successfully started service 'sparkDriver' on port 38186.
16/01/15 19:22:25 INFO SparkEnv: Registering MapOutputTracker
16/01/15 19:22:25 INFO SparkEnv: Registering BlockManagerMaster
16/01/15 19:22:25 INFO DiskBlockManager: Created local directory at /tmp/spark-ef3b8193-e086-4764-993c-0a40534052c1/blockmgr-e80c1c60-fe19-4be1-b3f9-259b3f1031a0
16/01/15 19:22:25 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
16/01/15 19:22:25 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ef3b8193-e086-4764-993c-0a40534052c1/httpd-e05a5a70-dbf3-4055-b6ab-7efa22dfa4d2
16/01/15 19:22:25 INFO HttpServer: Starting HTTP Server
16/01/15 19:22:25 INFO Utils: Successfully started service 'HTTP file server' on port 34728.
16/01/15 19:22:25 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/15 19:22:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/15 19:22:35 INFO SparkUI: Started SparkUI at http://192.168.1.101:4040
16/01/15 19:22:35 INFO SparkContext: Added JAR file:/home/sparkUser/JarsOfSpark/JarForSpark.jar at http://192.168.1.101:34728/jars/JarForSpark.jar with timestamp 1452878555317
16/01/15 19:22:35 INFO AppClient$ClientActor: Connecting to master akka.tcp://[email protected]:7077/user/Master...
16/01/15 19:22:35 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20160115192235-0016
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor added: app-20160115192235-0016/0 on worker-20160115181337-192.168.1.104-50099 (192.168.1.104:50099) with 4 cores
16/01/15 19:22:35 INFO SparkDeploySchedulerBackend: Granted executor ID app-20160115192235-0016/0 on hostPort 192.168.1.104:50099 with 4 cores, 512.0 MB RAM
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor added: app-20160115192235-0016/1 on worker-20160115125104-192.168.1.101-43150 (192.168.1.101:43150) with 3 cores
16/01/15 19:22:35 INFO SparkDeploySchedulerBackend: Granted executor ID app-20160115192235-0016/1 on hostPort 192.168.1.101:43150 with 3 cores, 512.0 MB RAM
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor updated: app-20160115192235-0016/1 is now LOADING
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor updated: app-20160115192235-0016/0 is now LOADING
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor updated: app-20160115192235-0016/0 is now RUNNING
16/01/15 19:22:35 INFO AppClient$ClientActor: Executor updated: app-20160115192235-0016/1 is now RUNNING
16/01/15 19:22:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33359.
16/01/15 19:22:35 INFO NettyBlockTransferService: Server created on 33359
16/01/15 19:22:35 INFO BlockManagerMaster: Trying to register BlockManager
16/01/15 19:22:35 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.101:33359 with 265.1 MB RAM, BlockManagerId(driver, 192.168.1.101, 33359)
16/01/15 19:22:35 INFO BlockManagerMaster: Registered BlockManager
16/01/15 19:22:35 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
16/01/15 19:22:38 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:49573/user/Executor#1472403765]) with ID 0
16/01/15 19:22:39 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.104:33856 with 265.1 MB RAM, BlockManagerId(0, 192.168.1.104, 33856)
16/01/15 19:22:40 INFO MemoryStore: ensureFreeSpace(130448) called with curMem=0, maxMem=278019440
16/01/15 19:22:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 265.0 MB)
16/01/15 19:22:40 INFO MemoryStore: ensureFreeSpace(14257) called with curMem=130448, maxMem=278019440
16/01/15 19:22:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 265.0 MB)
16/01/15 19:22:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.101:33359 (size: 13.9 KB, free: 265.1 MB)
16/01/15 19:22:40 INFO SparkContext: Created broadcast 0 from textFile at Main.java:25
16/01/15 19:22:41 INFO FileInputFormat: Total input paths to process : 1
16/01/15 19:22:41 INFO SparkContext: Starting job: foreach at Main.java:33
16/01/15 19:22:41 INFO DAGScheduler: Got job 0 (foreach at Main.java:33) with 7 output partitions (allowLocal=false)
16/01/15 19:22:41 INFO DAGScheduler: Final stage: ResultStage 0(foreach at Main.java:33)
16/01/15 19:22:41 INFO DAGScheduler: Parents of final stage: List()
16/01/15 19:22:41 INFO DAGScheduler: Missing parents: List()
16/01/15 19:22:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at Main.java:30), which has no missing parents
16/01/15 19:22:41 INFO MemoryStore: ensureFreeSpace(4400) called with curMem=144705, maxMem=278019440
16/01/15 19:22:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 265.0 MB)
16/01/15 19:22:41 INFO MemoryStore: ensureFreeSpace(2538) called with curMem=149105, maxMem=278019440
16/01/15 19:22:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 265.0 MB)
16/01/15 19:22:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.101:33359 (size: 2.5 KB, free: 265.1 MB)
16/01/15 19:22:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:874
16/01/15 19:22:41 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at Main.java:30)
16/01/15 19:22:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 7 tasks
16/01/15 19:22:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.104:33856 (size: 2.5 KB, free: 265.1 MB)
16/01/15 19:22:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.104:33856 (size: 13.9 KB, free: 265.1 MB)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 2017 ms on 192.168.1.104 (1/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2036 ms on 192.168.1.104 (2/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2027 ms on 192.168.1.104 (3/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2027 ms on 192.168.1.104 (4/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 143 ms on 192.168.1.104 (5/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 199 ms on 192.168.1.104 (6/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 206 ms on 192.168.1.104 (7/7)
16/01/15 19:22:43 INFO DAGScheduler: ResultStage 0 (foreach at Main.java:33) finished in 2.218 s
16/01/15 19:22:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/01/15 19:22:43 INFO DAGScheduler: Job 0 finished: foreach at Main.java:33, took 2.289399 s
16/01/15 19:22:43 INFO SparkContext: Starting job: foreach at Main.java:34
16/01/15 19:22:43 INFO DAGScheduler: Got job 1 (foreach at Main.java:34) with 7 output partitions (allowLocal=false)
16/01/15 19:22:43 INFO DAGScheduler: Final stage: ResultStage 1(foreach at Main.java:34)
16/01/15 19:22:43 INFO DAGScheduler: Parents of final stage: List()
16/01/15 19:22:43 INFO DAGScheduler: Missing parents: List()
16/01/15 19:22:43 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at map at Main.java:30), which has no missing parents
16/01/15 19:22:43 INFO MemoryStore: ensureFreeSpace(4824) called with curMem=151643, maxMem=278019440
16/01/15 19:22:43 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.7 KB, free 265.0 MB)
16/01/15 19:22:43 INFO MemoryStore: ensureFreeSpace(2761) called with curMem=156467, maxMem=278019440
16/01/15 19:22:43 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.0 MB)
16/01/15 19:22:43 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.101:33359 (size: 2.7 KB, free: 265.1 MB)
16/01/15 19:22:43 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
16/01/15 19:22:43 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 1 (MapPartitionsRDD[2] at map at Main.java:30)
16/01/15 19:22:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 7 tasks
16/01/15 19:22:43 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 7, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 8, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 9, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 10, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.104:33856 (size: 2.7 KB, free: 265.1 MB)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 11, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 7) in 106 ms on 192.168.1.104 (1/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 8) in 125 ms on 192.168.1.104 (2/7)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 12, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 13, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 9) in 131 ms on 192.168.1.104 (3/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 10) in 133 ms on 192.168.1.104 (4/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 12) in 32 ms on 192.168.1.104 (5/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 11) in 61 ms on 192.168.1.104 (6/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 13) in 34 ms on 192.168.1.104 (7/7)
16/01/15 19:22:43 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/01/15 19:22:43 INFO DAGScheduler: ResultStage 1 (foreach at Main.java:34) finished in 0.165 s
16/01/15 19:22:43 INFO DAGScheduler: Job 1 finished: foreach at Main.java:34, took 0.177378 s
16/01/15 19:22:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.101:4040
16/01/15 19:22:43 INFO DAGScheduler: Stopping DAGScheduler
16/01/15 19:22:43 INFO SparkDeploySchedulerBackend: Shutting down all executors
16/01/15 19:22:43 INFO SparkDeploySchedulerBackend: Asking each executor to shut down
16/01/15 19:22:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/15 19:22:43 INFO Utils: path = /tmp/spark-ef3b8193-e086-4764-993c-0a40534052c1/blockmgr-e80c1c60-fe19-4be1-b3f9-259b3f1031a0, already present as root for deletion.
16/01/15 19:22:43 INFO MemoryStore: MemoryStore cleared
16/01/15 19:22:43 INFO BlockManager: BlockManager stopped
16/01/15 19:22:43 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/15 19:22:43 INFO SparkContext: Successfully stopped SparkContext
16/01/15 19:22:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/15 19:22:43 INFO Utils: Shutdown hook called
16/01/15 19:22:43 INFO Utils: Deleting directory /tmp/spark-ef3b8193-e086-4764-993c-0a40534052c1
logs from worker 192.168.1.101:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/15 18:14:15 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/01/15 18:14:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/15 18:14:15 INFO SecurityManager: Changing view acls to: sparkUser
16/01/15 18:14:15 INFO SecurityManager: Changing modify acls to: sparkUser
16/01/15 18:14:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sparkUser); users with modify permissions: Set(sparkUser)
logs from worker 192.168.1.104:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/15 19:23:23 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/01/15 19:23:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/15 19:23:24 INFO SecurityManager: Changing view acls to: root,sparkUser
16/01/15 19:23:24 INFO SecurityManager: Changing modify acls to: root,sparkUser
16/01/15 19:23:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, sparkUser); users with modify permissions: Set(root, sparkUser)
16/01/15 19:23:25 INFO Slf4jLogger: Slf4jLogger started
16/01/15 19:23:25 INFO Remoting: Starting remoting
16/01/15 19:23:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:43937]
16/01/15 19:23:25 INFO Utils: Successfully started service 'driverPropsFetcher' on port 43937.
16/01/15 19:23:26 INFO SecurityManager: Changing view acls to: root,sparkUser
16/01/15 19:23:26 INFO SecurityManager: Changing modify acls to: root,sparkUser
16/01/15 19:23:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, sparkUser); users with modify permissions: Set(root, sparkUser)
16/01/15 19:23:26 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/15 19:23:26 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/01/15 19:23:26 INFO Slf4jLogger: Slf4jLogger started
16/01/15 19:23:26 INFO Remoting: Starting remoting
16/01/15 19:23:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49573]
16/01/15 19:23:26 INFO Utils: Successfully started service 'sparkExecutor' on port 49573.
16/01/15 19:23:26 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/01/15 19:23:26 INFO DiskBlockManager: Created local directory at /tmp/spark-6ffb215c-7267-4a93-a766-2486d2331f6b/executor-146bfe64-d7e8-4da4-9144-8003754f0b5b/blockmgr-41031d8c-b069-4147-90c9-2237baed04f1
16/01/15 19:23:26 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
16/01/15 19:23:26 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://[email protected]:38186/user/CoarseGrainedScheduler
16/01/15 19:23:26 INFO WorkerWatcher: Connecting to worker akka.tcp://[email protected]:50099/user/Worker
16/01/15 19:23:26 INFO WorkerWatcher: Successfully connected to akka.tcp://[email protected]:50099/user/Worker
16/01/15 19:23:26 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
16/01/15 19:23:26 INFO Executor: Starting executor ID 0 on host 192.168.1.104
16/01/15 19:23:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33856.
16/01/15 19:23:26 INFO NettyBlockTransferService: Server created on 33856
16/01/15 19:23:26 INFO BlockManagerMaster: Trying to register BlockManager
16/01/15 19:23:26 INFO BlockManagerMaster: Registered BlockManager
16/01/15 19:23:29 INFO CoarseGrainedExecutorBackend: Got assigned task 0
16/01/15 19:23:29 INFO CoarseGrainedExecutorBackend: Got assigned task 1
16/01/15 19:23:29 INFO CoarseGrainedExecutorBackend: Got assigned task 2
16/01/15 19:23:29 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/15 19:23:29 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16/01/15 19:23:29 INFO CoarseGrainedExecutorBackend: Got assigned task 3
16/01/15 19:23:29 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/01/15 19:23:29 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
16/01/15 19:23:29 INFO Executor: Fetching http://192.168.1.101:34728/jars/JarForSpark.jar with timestamp 1452878555317
16/01/15 19:23:29 INFO Utils: Fetching http://192.168.1.101:34728/jars/JarForSpark.jar to /tmp/spark-6ffb215c-7267-4a93-a766-2486d2331f6b/executor-146bfe64-d7e8-4da4-9144-8003754f0b5b/fetchFileTemp1585609242243689070.tmp
16/01/15 19:23:29 INFO Utils: Copying /tmp/spark-6ffb215c-7267-4a93-a766-2486d2331f6b/executor-146bfe64-d7e8-4da4-9144-8003754f0b5b/3339800781452878555317_cache to /home/sparkUser2/Programs/spark-1.4.1-bin-hadoop2.6/work/app-20160115192235-0016/0/./JarForSpark.jar
16/01/15 19:23:29 INFO Executor: Adding file:/home/sparkUser2/Programs/spark-1.4.1-bin-hadoop2.6/work/app-20160115192235-0016/0/./JarForSpark.jar to class loader
16/01/15 19:23:29 INFO TorrentBroadcast: Started reading broadcast variable 1
16/01/15 19:23:29 INFO MemoryStore: ensureFreeSpace(2538) called with curMem=0, maxMem=278019440
16/01/15 19:23:29 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 265.1 MB)
16/01/15 19:23:29 INFO TorrentBroadcast: Reading broadcast variable 1 took 273 ms
16/01/15 19:23:29 INFO MemoryStore: ensureFreeSpace(4400) called with curMem=2538, maxMem=278019440
16/01/15 19:23:29 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.3 KB, free 265.1 MB)
16/01/15 19:23:29 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:161772+161772
16/01/15 19:23:29 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:323544+161772
16/01/15 19:23:29 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:0+161772
16/01/15 19:23:29 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:485316+161772
16/01/15 19:23:29 INFO TorrentBroadcast: Started reading broadcast variable 0
16/01/15 19:23:29 INFO MemoryStore: ensureFreeSpace(14257) called with curMem=6938, maxMem=278019440
16/01/15 19:23:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 265.1 MB)
16/01/15 19:23:30 INFO TorrentBroadcast: Reading broadcast variable 0 took 66 ms
16/01/15 19:23:30 INFO MemoryStore: ensureFreeSpace(188976) called with curMem=21195, maxMem=278019440
16/01/15 19:23:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 184.5 KB, free 264.9 MB)
16/01/15 19:23:30 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/01/15 19:23:30 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/01/15 19:23:30 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/01/15 19:23:30 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/01/15 19:23:30 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/01/15 19:23:30 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 1796 bytes result sent to driver
16/01/15 19:23:30 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 1796 bytes result sent to driver
16/01/15 19:23:30 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1796 bytes result sent to driver
16/01/15 19:23:30 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1796 bytes result sent to driver
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 4
16/01/15 19:23:31 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:647088+161772
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 5
16/01/15 19:23:31 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 6
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:808860+161772
16/01/15 19:23:31 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:970632+161773
16/01/15 19:23:31 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 1796 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 1796 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 1796 bytes result sent to driver
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 7
16/01/15 19:23:31 INFO Executor: Running task 0.0 in stage 1.0 (TID 7)
16/01/15 19:23:31 INFO TorrentBroadcast: Started reading broadcast variable 2
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 8
16/01/15 19:23:31 INFO Executor: Running task 1.0 in stage 1.0 (TID 8)
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 9
16/01/15 19:23:31 INFO Executor: Running task 2.0 in stage 1.0 (TID 9)
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 10
16/01/15 19:23:31 INFO Executor: Running task 3.0 in stage 1.0 (TID 10)
16/01/15 19:23:31 INFO MemoryStore: ensureFreeSpace(2761) called with curMem=210171, maxMem=278019440
16/01/15 19:23:31 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.7 KB, free 264.9 MB)
16/01/15 19:23:31 INFO TorrentBroadcast: Reading broadcast variable 2 took 42 ms
16/01/15 19:23:31 INFO MemoryStore: ensureFreeSpace(4824) called with curMem=212932, maxMem=278019440
16/01/15 19:23:31 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.7 KB, free 264.9 MB)
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:0+161772
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:161772+161772
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:323544+161772
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:485316+161772
16/01/15 19:23:31 INFO Executor: Finished task 0.0 in stage 1.0 (TID 7). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 1.0 in stage 1.0 (TID 8). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 2.0 in stage 1.0 (TID 9). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 3.0 in stage 1.0 (TID 10). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 11
16/01/15 19:23:31 INFO Executor: Running task 4.0 in stage 1.0 (TID 11)
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:647088+161772
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 12
16/01/15 19:23:31 INFO Executor: Running task 5.0 in stage 1.0 (TID 12)
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 13
16/01/15 19:23:31 INFO Executor: Running task 6.0 in stage 1.0 (TID 13)
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:808860+161772
16/01/15 19:23:31 INFO HadoopRDD: Input split: file:/Datasets/somefile.txt:970632+161773
16/01/15 19:23:31 INFO Executor: Finished task 5.0 in stage 1.0 (TID 12). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 4.0 in stage 1.0 (TID 11). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO Executor: Finished task 6.0 in stage 1.0 (TID 13). 1814 bytes result sent to driver
16/01/15 19:23:31 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
I also tried to stop one of the workers to see what happens and the program successfully completed on the other worker. and I looked at this post but unfortunately it didn't solved my problem: Why my tasks only be done in one worker in Spark cluster
Appreciate your help!
It is because of Data Locality - "How close data is to the code processing it" Spark tries to schedule the available tasks to its best locality levels.
Spark by default tries "PROCESS_LOCAL" mode as the first option and switches on to the lower levels only if it sees that the none of the CPU's are freed after a certain time interval. Default wait time before switching to lower levels is 3s (see spark.locality.wait parameter).
And looking at the logs, all your tasks are finished within 3 seconds.
16/01/15 19:22:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.104:33856 (size: 2.5 KB, free: 265.1 MB)
16/01/15 19:22:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.104:33856 (size: 13.9 KB, free: 265.1 MB)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, 192.168.1.104, PROCESS_LOCAL, 1495 bytes)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 2017 ms on 192.168.1.104 (1/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2036 ms on 192.168.1.104 (2/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 2027 ms on 192.168.1.104 (3/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2027 ms on 192.168.1.104 (4/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 143 ms on 192.168.1.104 (5/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 199 ms on 192.168.1.104 (6/7)
16/01/15 19:22:43 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 206 ms on 192.168.1.104 (7/7)
16/01/15 19:22:43 INFO DAGScheduler: ResultStage 0 (foreach at Main.java:33) finished in 2.218 s
16/01/15 19:22:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/01/15 19:22:43 INFO DAGScheduler: Job 0 finished: foreach at Main.java:33, took 2.289399 s
Would suggest to try with larger files (in GB's) where each tasks takes some time to get the final results.
for more information on Data Locality, please read the section "Data Locality" in Spark Tuning Section
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With