Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple Message Types in a Single Kafka Topic with Avro

I have an event sourced application built on top of Kafka. Currently I have one Topic that has multiple message types in it. All serialized/deserialized with JSON.

The schema registry from confluent looks like a good approach to message types maintenance and with Avro full compatibility mode it also provides a mechanism to message versioning in my event sourced app.

With recent patch -- blog post to 4.1.1 confluent. You can have multiple different types of messages in one topic with Avro serializer/deserializer.

However, I haven't seen any working example of this. Not even a single one.

My question is: does the above patch really work without having to use Avro Union Types (putting all different types of messages in one single schema and utilize union)?

And how would this approach work with a Kafka Streaming app, where you need to specify a Key and Value Serde?

Should I just forget about Avro and just go with protobuff instead?

like image 961
J S Avatar asked Jul 19 '18 18:07

J S


People also ask

Should you put several event types in the same Kafka topic?

If you are using a data encoding such as JSON, without a statically defined schema, you can easily put many different event types in the same topic. However, if you are using a schema-based encoding such as Avro, a bit more thought is needed to handle multiple event types in a single topic.

Can a Kafka topic have multiple schemas?

It is possible to setup multiple schemas per topic, this is a common use case when using a subject name strategy different than TopicNameStrategy. If using multiple schemas, as it can be seen in the earlier example, the user must set the subject naming strategy for the topic to be one of the other options.

How Avro schema works in Kafka?

Apache Avro is a binary serialization format. It relies on schemas (defined in JSON format) that define what fields are present and their type. Nested fields are supported as well as arrays. Avro supports schema evolutivity: you can have multiple versions of your schema, by adding or removing fields.

What is the format for an Avro message?

Avro defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format when storing data in topics.


1 Answers

This is an example of consumer that gets data from the topic where events of different types are published:

package com.kafka.schema;

import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private static Consumer<Long, GenericRecord> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        // Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Use Specific Record or else you get Avro GenericRecord.
        // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

        // Schema registry location.
        // Run Schema Registry on 8081
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
        props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        return new KafkaConsumer<>(props);
    }

    public static void main(String... args) {
        final Consumer<Long, GenericRecord> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(Const.TOPIC));
        IntStream.range(1, 100).forEach(index -> {
            final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            if (records.count() == 0) {
                System.out.println("None found");
            } else {
                records.forEach(record -> {
                    GenericRecord recValue = record.value();
                    System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
                });
            }
        });
    }
}

The important part here is this:

props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
like image 141
Evgeny Semionov Avatar answered Sep 20 '22 00:09

Evgeny Semionov