Spark Streaming Problem with Kafka DirectStream:
spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096
Tried:
1) Adjust increasing spark.streaming.kafka.consumer.poll.ms
-- from 512 to 4096, less failed, but even 10s the failed still exists
2) Adjust executor memory from 1G to 2G
-- partly work, much less failed
3) https://issues.apache.org/jira/browse/SPARK-19275
-- still got failed when streaming durations all less than 8s ("session.timeout.ms" -> "30000")
4) Try Spark 2.1
-- problem still there
with Scala 2.11.8, Kafka version : 0.10.0.0, Spark version : 2.0.2
Spark configs
.config("spark.cores.max", "4")
.config("spark.default.parallelism", "2")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.receiver.maxRate", "1024")
.config("spark.streaming.kafka.maxRatePerPartition", "256")
.config("spark.streaming.kafka.consumer.poll.ms", "4096")
.config("spark.streaming.concurrentJobs", "2")
using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar
Error stacks:
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194)
...
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108)
...
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Losing 1%+ blocks datum from Kafka with this failure :( pls help!
Current solution:
num.network.threads
in kafka/config/server.properties, default is 3spark.streaming.kafka.consumer.poll.ms
value ~! a large one ...
without config spark.streaming.kafka.consumer.poll.ms, it's using the spark.network.timeout, which is 120s -- causing some problem
I solved the issue using a simple configuration change which was quite apparent but it took me sometime to realize how such a default (mis)configuration could be left untreated.
The primary issue is Spark config spark.streaming.kafka.consumer.poll.ms
(default 512ms in KafkaRDD) or spark.network.timeout
(default 120sec, if spark.streaming.kafka.consumer.poll.ms
is not set) is always less than Kafka consumer request.timeout.ms
(default 305000ms in Kafka newconsumerapi) ... hence spark polling always times out before timeout happens at Kafka consumer request/poll (when there are no records available in Kafka topic).
Simply increasing spark.streaming.kafka.consumer.poll.ms
to a value greater than Kafka request.timeout.ms
should do the trick. Also adjust Kafka consumer max.poll.interval.ms
to be always less than request.timeout.ms
.
Q.E.D and Good luck.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With