I'm having some issues while trying to read from kafka with spark streaming.
My code is:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092. But I get the following error in the Spark driver:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Zookeeper log:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
Any hint?
Thank you very much
Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. A direct stream can also be created for an input stream to directly pull messages from Kafka. This can be implemented through the following code: import java.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.
Spark streaming is better at processing group of rows(groups,by,ml,window functions etc.) Kafka streams provides true a-record-at-a-time processing capabilities. it's better for functions like rows parsing, data cleansing etc. Spark streaming is standalone framework.
The problem was related the wrong spark-streaming-kafka version.
As described in the documentation
Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1
So, including
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
in my pom.xml (instead of version 0.9.0.0) solved the issue.
Hope this helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With