Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Consumer API of Kafka 0.8.2?

I'm getting start with the latest Kafka document http://kafka.apache.org/documentation.html. But I meet some problem when I try to use the new Consumer API. I've done the job with following steps:

1. Add a new dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2. Add configurations

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. Use KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

However, when I try to poll message from the broker, I got nothing but null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

And then I know what's wrong with the consumer after I checked the source code:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

To make matters worse, I cannot find any other useful information about the 0.8.2 API, since all usages about Kafka are not compatible with the latest version. Could anybody help me? Thanks a lot.

like image 320
Yohn Avatar asked Jun 16 '15 01:06

Yohn


People also ask

How can I get data from Kafka consumer?

Consumers and consumer groups Since the data record is in some partition, we can say that consumers simply read from partitions. Kafka consumers use a pull model to consume data. This means that a consumer periodically sends a request to a Kafka broker in order to fetch data from it.

What is consumer API in Kafka?

In this article Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight. The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.

How does Kafka API work?

The Kafka Streams API to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more.


1 Answers

I am also trying to write a Consumer on top of Kafka 0.8.2.1 to read the messages produced by the new Producer.

So far what I have got is that the Producer API is ready and usable, while on the consumer side we have to wait 0.8.3, as @habsq noted and you already find out that there is some code included for the Consumer, but it is still not functional.

So the client to use (the current Client API) are the one found in the "core" project of your current Kafka version, i.e. 0.8.2.1 (better not downgrade the client to any other version).

So for now we need to import two jars: one for the "new" java clients and one for the core project, depending also on the scala version you are using (I use 2.11).

In my case I use graddle to manage dependencies so I just need to import

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

When you update dependencies it will get all the needed libraries.

If you are using a different Scala version just change the version; anyway you can find all the different version or the full pom on maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

If you use those Consumer implementation all the current examples should work as usual.

PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

like image 117
RobMcZag Avatar answered Sep 18 '22 14:09

RobMcZag