I have an event sourced application built on top of Kafka. Currently I have one Topic that has multiple message types in it. All serialized/deserialized with JSON.
The schema registry from confluent looks like a good approach to message types maintenance and with Avro full compatibility mode it also provides a mechanism to message versioning in my event sourced app.
With recent patch -- blog post to 4.1.1 confluent. You can have multiple different types of messages in one topic with Avro serializer/deserializer.
However, I haven't seen any working example of this. Not even a single one.
My question is: does the above patch really work without having to use Avro Union Types (putting all different types of messages in one single schema and utilize union)?
And how would this approach work with a Kafka Streaming app, where you need to specify a Key and Value Serde?
Should I just forget about Avro and just go with protobuff instead?
If you are using a data encoding such as JSON, without a statically defined schema, you can easily put many different event types in the same topic. However, if you are using a schema-based encoding such as Avro, a bit more thought is needed to handle multiple event types in a single topic.
It is possible to setup multiple schemas per topic, this is a common use case when using a subject name strategy different than TopicNameStrategy. If using multiple schemas, as it can be seen in the earlier example, the user must set the subject naming strategy for the topic to be one of the other options.
Apache Avro is a binary serialization format. It relies on schemas (defined in JSON format) that define what fields are present and their type. Nested fields are supported as well as arrays. Avro supports schema evolutivity: you can have multiple versions of your schema, by adding or removing fields.
Avro defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format when storing data in topics.
This is an example of consumer that gets data from the topic where events of different types are published:
package com.kafka.schema;
import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
public class AvroConsumer {
private static Consumer<Long, GenericRecord> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
// Use Kafka Avro Deserializer.
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use Specific Record or else you get Avro GenericRecord.
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
// Schema registry location.
// Run Schema Registry on 8081
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
return new KafkaConsumer<>(props);
}
public static void main(String... args) {
final Consumer<Long, GenericRecord> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(Const.TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
if (records.count() == 0) {
System.out.println("None found");
} else {
records.forEach(record -> {
GenericRecord recValue = record.value();
System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
});
}
});
}
}
The important part here is this:
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With