Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is kafka consumer 0.9 backward compatible?

Is the upcoming kafka consumer 0.9.x going to be compatible with 0.8 broker?

In other words - it is possible to only switch to new consumer implementation, without touching anything else?

like image 320
mabn Avatar asked Nov 19 '15 19:11

mabn


4 Answers

According to the documentation of Kafka 0.9.0, you can not use the new consumer for reading data from 0.8.x brokers. The reason is the following:

0.9.0.0 has an inter-broker protocol change from previous versions.

like image 164
Robert Metzger Avatar answered Oct 23 '22 07:10

Robert Metzger


No. In general it's recommended to upgrade brokers before clients since brokers target backwards compatibility. The 0.9 broker will work with both the 0.8 consumer and 0.9 consumer APIs but not the other way around.

like image 21
Lundahl Avatar answered Oct 23 '22 07:10

Lundahl


Based on this Consumer Client Re-design wiki page which quotes,

This would involve some significant changes to the consumer APIs*, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all.

*Emphasis mine.

I didn't find anywhere specifically stating no compatibility. But using that quote and the fact that the producer in 0.8 was not compatible with the producer in 0.7, I'm assuming that they not compatible.

like image 43
Morgan Kenyon Avatar answered Oct 23 '22 07:10

Morgan Kenyon


I recently faced the similar issue where in my application, I had to read from kafka 0.9 and then write back to kafka 0.8. I used kafka client 0.9 in following way.

Consumer Config

    props.put("bootstrap.servers", "brokers_ip as comma seperated values");
    props.put("group.id", "your group id");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe("List of topics to subscribe too");

Producer Config

        Properties props = new Properties();
        props.put("bootstrap.servers","list of broker ips");
        props.put("metadata.broker.list", "list of broker ips");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        String message = "hello world";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic_name, message);
        producer.send(data);
        producer.close();

Hope this helps.

like image 32
abhinav Avatar answered Oct 23 '22 09:10

abhinav