Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: Could not compute split, block not found

I am trying to use Spark Streaming with Kafka (version 1.1.0) but the Spark job keeps crashing due to this error:

14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

The only relevant information I get from the logs is this:

14/11/21 12:34:18 INFO MemoryStore: Block input-0-1416573258200 stored as bytes to memory (size 85.8 KB, free 2.3 GB)
14/11/21 12:34:18 INFO BlockManagerMaster: Updated info of block input-0-1416573258200
14/11/21 12:34:18 INFO BlockGenerator: Pushed block input-0-1416573258200
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
14/11/21 12:37:35 INFO BlockManagerInfo: Added input-0-1416573258200 in memory on ********:43117 (size: 85.8 KB, free: 2.3 GB)
org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = GetLocations(input-0-1416573258200)]
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found
java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

Sample code:

SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
jssc.checkpoint(checkpointDir);

HashMap<String, Integer> topics = new HashMap<String, Integer>();
topics.put(KAFKA_TOPIC, 1);

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("group.id", "spark-streaming-test");
kafkaParams.put("zookeeper.connect", ZOOKEEPER_QUORUM);
kafkaParams.put("zookeeper.connection.timeout.ms", "1000");
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairReceiverInputDStream<String, String> kafkaStream = 
  KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String, String> streamPair = kafkaStream.flatMapToPair(...).reduceByKey(...);

I'm not sure what cause of this issue is.

like image 272
Bobby Avatar asked Nov 21 '14 20:11

Bobby


3 Answers

It is due to Spark streaming model. It collects the data for a batch interval and sends it off for processing to spark engine. Spark engine is not aware that it is coming from a streaming system and it doesn't communicate it back to streaming component.

This means there is no flow control (backpressure control) unlike in native streaming systems like Storm or Flink which can nicely smoothen the spout/source flow based on processing rate.

From https://spark.apache.org/docs/latest/streaming-programming-guide.html Spark streaming model

One option to work around this would be to manually pass in processing info/Ack back to Receiver component - of course this also means we need to use a custom receiver. At this point we are starting to build features Storm/Flink etc. are providing out of the box.

like image 53
Fakrudeen Avatar answered Nov 01 '22 02:11

Fakrudeen


Check the following.

1) Did you create the streaming context properly as in

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

Your initialization is incorrect.

Have a look at the below

Eg : code at recoverableNetworkCount App

2) Have you enabled the property write ahead log "spark.streaming.receiver.writeAheadLog.enable"

3) Check the stability of streaming in the Streaming UI. processing time < batch interval.

like image 41
Knight71 Avatar answered Nov 01 '22 03:11

Knight71


have you tried with inputs.persist(StorageLevel.MEMORY_AND_DISK_SER).

E.g. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-td11186.html

like image 2
Tao Li Avatar answered Nov 01 '22 01:11

Tao Li