Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark task take a long time to find block locally?

Tags:

apache-spark

The RDD has 512 equally sized partitions and is 100% cached in memory across 512 executors.

I have a filter-map-collect job with 512 tasks. Sometimes this job completes sub-second. On other occasions 50% of the tasks complete sub-second, 45% of the tasks take 10 seconds and 5% of the tasks take 20 seconds.

Here is the log from an executor where the task took 20 seconds:

15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312 
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB) 
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally 
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver 

So it appears the 20 seconds is spent finding the local block. Looking at the logs for other slow tasks indicates that they are all delayed for the same reason. My understanding is that a local block means within the same JVM instance and so I don't understand why it takes so long to locate it.

Since the lag is always either exactly 10 seconds or exactly 20 seconds I suspect it's due to a 10 second timeout on some listener, or something like that. If that is true then I guess my options are either find out why it's timing out and fix it or make the timeout shorter so it tries more frequently.

Why does the task take so long to find a local block and how can I resolve this?

Update: Adding DEBUG log for org.apache.spark.storage.

16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029)
16/02/01 12:14:07 DEBUG Executor: Task 3029's epoch is 1
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162)
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took  2 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took  2 ms
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took  1 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took  1 ms
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668)
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
like image 772
user2179977 Avatar asked Jan 20 '16 15:01

user2179977


People also ask

How do I fix a Spark memory problem?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

How do you calculate driver memory and executor memory in Spark?

According to the recommendations which we discussed above:Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.

How is task executed in Spark?

A task in Spark is represented by the Task abstract class with two concrete implementations: ShuffleMapTask that executes a task and divides the task's output to multiple buckets (based on the task's partitioner). ResultTask that executes a task and sends the task's output back to the driver application.

What is the function of block manager in Spark?

Block Manager ( BlockManager ) is a key-value store for blocks of data (simply blocks) in Spark. BlockManager acts as a local cache that runs on every "node" in a Spark application, i.e. the driver and executors (and is created right when SparkEnv is).


2 Answers

The only thing that seems to stand out to me is that you have replication turned on via your storage level StorageLevel(false, true, false, true, 1)

Since you have 512 partitions across 512 executors it may be replicating the blocks across every executor, which may cause that slowdown at the end. I'd try and turn off the replication and see what that does for the performance.

like image 129
Stephen Carman Avatar answered Sep 28 '22 21:09

Stephen Carman


How many total cores are you allocating to your Spark application? This might happen if you are allocating 256 cores and if the value for spark.locality.wait is 10.

I don't know your environment but it seems you have too many executors. Have only a few executors (depending on how powerful your compute nodes are) and have multiple cores available to each executor. In short instead of having lots of processes with 1 thread each, have a few processes with many threads each.

like image 34
ACH Avatar answered Sep 28 '22 21:09

ACH