Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming Kafka stream

I'm having some issues while trying to read from kafka with spark streaming.

My code is:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "consumergroup",
  "metadata.broker.list" -> "localhost:9092",
  "zookeeper.connection.timeout.ms" -> "10000"
  //"kafka.auto.offset.reset" -> "smallest"
)

val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

I previously started zookeeper at port 2181 and Kafka server 0.9.0.0 at port 9092. But I get the following error in the Spark driver:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)

Zookeeper log:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

Any hint?

Thank you very much

like image 548
besil Avatar asked Dec 07 '15 23:12

besil


People also ask

How does Spark streaming work with Kafka?

Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. A direct stream can also be created for an input stream to directly pull messages from Kafka. This can be implemented through the following code: import java.

What is the difference between Spark streaming and Kafka?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.

Can Kafka and Spark used together?

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata.

What are the differences and similarities between Kafka and Spark streaming?

Spark streaming is better at processing group of rows(groups,by,ml,window functions etc.) Kafka streams provides true a-record-at-a-time processing capabilities. it's better for functions like rows parsing, data cleansing etc. Spark streaming is standalone framework.


1 Answers

The problem was related the wrong spark-streaming-kafka version.

As described in the documentation

Kafka: Spark Streaming 1.5.2 is compatible with Kafka 0.8.2.1

So, including

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

in my pom.xml (instead of version 0.9.0.0) solved the issue.

Hope this helps

like image 133
besil Avatar answered Sep 28 '22 08:09

besil