Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096

Spark Streaming Problem with Kafka DirectStream:

spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096

Tried:

1) Adjust increasing spark.streaming.kafka.consumer.poll.ms

-- from 512 to 4096, less failed, but even 10s the failed still exists

2) Adjust executor memory from 1G to 2G

-- partly work, much less failed

3) https://issues.apache.org/jira/browse/SPARK-19275

-- still got failed when streaming durations all less than 8s ("session.timeout.ms" -> "30000")

4) Try Spark 2.1

-- problem still there


with Scala 2.11.8, Kafka version : 0.10.0.0, Spark version : 2.0.2

Spark configs

.config("spark.cores.max", "4")
.config("spark.default.parallelism", "2")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.receiver.maxRate", "1024")
.config("spark.streaming.kafka.maxRatePerPartition", "256")
.config("spark.streaming.kafka.consumer.poll.ms", "4096")
.config("spark.streaming.concurrentJobs", "2")

using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar

Error stacks:

at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194)
...
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108)
...
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Losing 1%+ blocks datum from Kafka with this failure :( pls help!

like image 399
mindon Avatar asked Feb 16 '17 04:02

mindon


2 Answers

Current solution:

  • Increase num.network.threads in kafka/config/server.properties, default is 3
  • Increase spark.streaming.kafka.consumer.poll.ms value ~! a large one ... without config spark.streaming.kafka.consumer.poll.ms, it's using the spark.network.timeout, which is 120s -- causing some problem
  • Optional step: Decrease the "max.poll.records", default is 500
  • Optional step: use Future{} to run time-cost-task in parallel
like image 77
mindon Avatar answered Nov 19 '22 09:11

mindon


I solved the issue using a simple configuration change which was quite apparent but it took me sometime to realize how such a default (mis)configuration could be left untreated.

The primary issue is Spark config spark.streaming.kafka.consumer.poll.ms (default 512ms in KafkaRDD) or spark.network.timeout (default 120sec, if spark.streaming.kafka.consumer.poll.ms is not set) is always less than Kafka consumer request.timeout.ms (default 305000ms in Kafka newconsumerapi) ... hence spark polling always times out before timeout happens at Kafka consumer request/poll (when there are no records available in Kafka topic).

Simply increasing spark.streaming.kafka.consumer.poll.ms to a value greater than Kafka request.timeout.ms should do the trick. Also adjust Kafka consumer max.poll.interval.ms to be always less than request.timeout.ms.

Q.E.D and Good luck.

like image 1
neel Avatar answered Nov 19 '22 10:11

neel