Here is the code I used to read data from Kafka By using Spark Structured Streaming,
//ss:SparkSession is defined before.
import ss.implicits._
val df = ss
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_server)
.option("subscribe", topic_input)
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", "5000")
.option("failOnDataLoss", "false")
.load()
Here is the error code,
Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds
If I enlarge the 5000 to 10000, this error still happens. And I google this qquestion by Google. It seems there no much related info about this Issue.
Here is the part of sbt file related to this issue.
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
I got this error too.
I viewed the source code of KafkaSourceRDD, got nothing.
I guess something is wrong with the kafka connector, thus I excluded the kafka-client in the "spark-sql-kafka-0-10_2.11" package, and add a new dependency, like this:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
Now it works. Hope it helps.
I created a jira issue to report this problem: https://issues.apache.org/jira/browse/SPARK-23829
Update on 12/17/2018: Spark 2.4 and Kafka2.0 resolves the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With