Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spark Structured Streaming to Read Data From Kafka, Issue of Over-time is Always Occured

Here is the code I used to read data from Kafka By using Spark Structured Streaming,

//ss:SparkSession is defined before. 
import ss.implicits._
val df = ss
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_server)
  .option("subscribe", topic_input)
  .option("startingOffsets", "latest")
  .option("kafkaConsumer.pollTimeoutMs", "5000")
  .option("failOnDataLoss", "false")
  .load()

Here is the error code,

  Caused by: java.util.concurrent.TimeoutException: Cannot fetch record xxxx for offset in 5000 milliseconds

If I enlarge the 5000 to 10000, this error still happens. And I google this qquestion by Google. It seems there no much related info about this Issue.

Here is the part of sbt file related to this issue.

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0" exclude ("org.apache.kafka", "kafka-clients")
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
like image 666
GaoYuan Avatar asked Mar 28 '18 05:03

GaoYuan


1 Answers

I got this error too.

I viewed the source code of KafkaSourceRDD, got nothing.

I guess something is wrong with the kafka connector, thus I excluded the kafka-client in the "spark-sql-kafka-0-10_2.11" package, and add a new dependency, like this:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.3.0</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>kafka-clients</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>

Now it works. Hope it helps.

I created a jira issue to report this problem: https://issues.apache.org/jira/browse/SPARK-23829

Update on 12/17/2018: Spark 2.4 and Kafka2.0 resolves the problem.

like image 128
Norman Bai Avatar answered Sep 22 '22 10:09

Norman Bai