Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer in Spark Streaming

Trying to write a Spark Streaming job that consumes messages from Kafka. Here’s what I’ve done so far:

  1. Started Zookeeper
  2. Started Kafka Server
  3. Sent a few messages to the server. I can see them when I run the following:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. Now trying to write a program to count # of messages coming in within 5 minutes.

The code looks something like this:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

Not sure what value to use for the 3rd argument (consumer group). When I run this I get Unable to connect to zookeeper server. But Zookeeper is running on port 2181; otherwise step #3 would not have worked.

Seems like I am not using KafkaUtils.createStream properly. Any ideas?

like image 998
DilTeam Avatar asked Nov 03 '14 23:11

DilTeam


1 Answers

There is no such thing as default consumer group. You can use an arbitrary non-empty string there. If you have only one consumer, its consumer group doesn't really matter. If there are two or more consumers, they can either be a part of the same consumer group or belong to different consumer groups.

From http://kafka.apache.org/documentation.html :

Consumers

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

I think the problem may be in 'topics' parameter. From Spark docs:

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

You only specified a single partition for your topic, namely '1'. Depending on broker's setting (num.partitions), there may be more than one partitions and your messages may be sent to other partitions which aren't read by your program.

Besides, I believe the partitionIds are 0 based. So if you have only one partition, it will have the id equal to 0.

like image 194
Denis Makarenko Avatar answered Oct 05 '22 17:10

Denis Makarenko