I'm getting start with the latest Kafka document http://kafka.apache.org/documentation.html. But I meet some problem when I try to use the new Consumer API. I've done the job with following steps:
1. Add a new dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
2. Add configurations
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
3. Use KafkaConsumer API
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
However, when I try to poll message from the broker, I got nothing but null:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
And then I know what's wrong with the consumer after I checked the source code:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
To make matters worse, I cannot find any other useful information about the 0.8.2 API, since all usages about Kafka are not compatible with the latest version. Could anybody help me? Thanks a lot.
Consumers and consumer groups Since the data record is in some partition, we can say that consumers simply read from partitions. Kafka consumers use a pull model to consume data. This means that a consumer periodically sends a request to a Kafka broker in order to fetch data from it.
In this article Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight. The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.
The Kafka Streams API to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more.
I am also trying to write a Consumer on top of Kafka 0.8.2.1 to read the messages produced by the new Producer.
So far what I have got is that the Producer API is ready and usable, while on the consumer side we have to wait 0.8.3, as @habsq noted and you already find out that there is some code included for the Consumer, but it is still not functional.
So the client to use (the current Client API) are the one found in the "core" project of your current Kafka version, i.e. 0.8.2.1 (better not downgrade the client to any other version).
So for now we need to import two jars: one for the "new" java clients and one for the core project, depending also on the scala version you are using (I use 2.11).
In my case I use graddle to manage dependencies so I just need to import
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}
When you update dependencies it will get all the needed libraries.
If you are using a different Scala version just change the version; anyway you can find all the different version or the full pom on maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22
If you use those Consumer implementation all the current examples should work as usual.
PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With