Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming + Kafka: SparkException: Couldn't find leader offsets for Set

I'm trying to setup Spark Streaming to get messages from Kafka queue. I'm getting the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)

Here is the code I'm executing (pyspark):

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})

ssc.start()
ssc.awaitTermination()

There were a couple of similar posts with the same error. In all cases the cause was the empty kafka topic. There are messages in my "test-topic". I can get them out with

kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100

Does anyone know what might be the problem?

I'm using:

  • Spark 1.5.2 (apache)
  • Kafka 0.8.2.0+kafka1.3.0 (CDH 5.4.7)
like image 997
facha Avatar asked Dec 15 '15 11:12

facha


4 Answers

You need to check 2 things:

  1. check if this topic and partition exists , in your case is topic is test-topic and partition is 0.

  2. based on your code, you are trying consume message from offset 0 and it might be possible message is not available from offset 0, check what is you earliest offset and try consume from there.

Below is command to check earliest offset:

sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1 
like image 121
Narendra Parmar Avatar answered Oct 24 '22 12:10

Narendra Parmar


1) You have to make sure that you have already created topic test-topic

Run following command to check list of the topic

kafka-topics.sh --list --zookeeper [host or ip of zookeeper]:[port]

2) After checking your topic, you have to configure your Kafka configuration in Socket Server Settings section

listeners=PLAINTEXT://[host or ip of Kafka]:[port]

like image 35
Narongsak Mala Avatar answered Oct 24 '22 13:10

Narongsak Mala


If you define short host names in /etc/hosts and use them in your kafka servers' configurations, you should change those name to ip. Or register the same short host name in your local PC or client's /etc/hosts.

Error occurred because Spark streaming lib can't resolve short hostname in the PC or client.

like image 20
user6399397 Avatar answered Oct 24 '22 13:10

user6399397


Another option to force creating topic if it doesn't exist. You can do this by setting property "auto.create.topics.enable" to "true" in kafkaParams map like this.

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> kafkaHost,
  "group.id" -> kafkaGroup,
  "auto.create.topics.enable" -> "true")

Using Scala 2.11 and Kafka 0.10 versions.

like image 23
Peter T. Avatar answered Oct 24 '22 13:10

Peter T.