I'm trying to setup Spark Streaming to get messages from Kafka queue. I'm getting the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream.
: org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([test-topic,0])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
Here is the code I'm executing (pyspark):
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "host.domain:9092"})
ssc.start()
ssc.awaitTermination()
There were a couple of similar posts with the same error. In all cases the cause was the empty kafka topic. There are messages in my "test-topic". I can get them out with
kafka-console-consumer --zookeeper host.domain:2181 --topic test-topic --from-beginning --max-messages 100
Does anyone know what might be the problem?
I'm using:
You need to check 2 things:
check if this topic and partition exists , in your case is topic is test-topic
and partition is 0.
based on your code, you are trying consume message from offset 0 and it might be possible message is not available from offset 0, check what is you earliest offset and try consume from there.
Below is command to check earliest offset:
sh kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "your broker list" --topic "topic name" --time -1
1) You have to make sure that you have already created topic test-topic
Run following command to check list of the topic
kafka-topics.sh --list --zookeeper [host or ip of zookeeper]:[port]
2) After checking your topic, you have to configure your Kafka configuration in Socket Server Settings
section
listeners=PLAINTEXT://[host or ip of Kafka]:[port]
If you define short host names in /etc/hosts and use them in your kafka servers' configurations, you should change those name to ip. Or register the same short host name in your local PC or client's /etc/hosts.
Error occurred because Spark streaming lib can't resolve short hostname in the PC or client.
Another option to force creating topic if it doesn't exist. You can do this by setting property "auto.create.topics.enable" to "true" in kafkaParams map like this.
val kafkaParams = Map[String, String](
"bootstrap.servers" -> kafkaHost,
"group.id" -> kafkaGroup,
"auto.create.topics.enable" -> "true")
Using Scala 2.11 and Kafka 0.10 versions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With