I have a simple class to consume messages from a kafka server. The majority of codes are copied from the comments of org.apache.kafka.clients.consumer.KafkaConsumer.java.
public class Demo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.144.10:29092");
props.put("group.id", "test");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe("voltdbexportAUDIT", "voltdbexportTEST");
boolean isRunning = true;
while (isRunning) {
Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
process(records);
}
consumer.close();
}
private static Map<TopicPartition, Long> process(Map<String, ConsumerRecords<byte[], byte[]>> records) {
Map<TopicPartition, Long> processedOffsets = new HashMap<>();
for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
for (int i = 0; i < recordsPerTopic.size(); i++) {
ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
// process record
try {
processedOffsets.put(record.topicAndPartition(), record.offset());
} catch (Exception e) {
e.printStackTrace();
}
}
}
return processedOffsets;
}
}
I am using 'org.apache.kafka:kafka-clients:0.8.2.0'. it throws exception
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:430)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:413)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:400)
at kafka.integration.Demo.main(Demo.java:26)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
How should I config the key.deserializer?
This works out of the box without implementing your own serializers
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "range");
For keys use one of the following
String Key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JSON Key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Avro Key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
ByteArray Key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Likewise, use one of the following for your value deserializer:
String Value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JSON Value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Avro Value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
ByteArray Value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Note that for Avro deserialisers, you will need the following dependencies:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
You need to set the properties:
props.put("serializer.class","my.own.serializer.StringSupport");
props.put("key.serializer.class","my.own.serializer.LongSupport");
in your main method so that you pass them to the producer's constructor. Of course, you'd have to specify the right encoders. The serializer class converts the message into a byte array and the key.serializer class turn the key object into a byte array. Generally you'd also have them be able to reverse the process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With