I have the following configuration for my Kafka Stream application
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
And I got the following error:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
I have tried to replace the line
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
with
config.put("schema.registry.url","http://localhost:8081");
but with the same error
I have followed the instruction from this url when preparing my Stream application.
Any suggestion?
You can test whether your application has received data from the topic. But if your application uses Apache Avro for serialization, and you do not use Confluent Schema Registry (which is the case when you want to use spring-kafka-test for testing) then you have a problem.
It is an application that resides outside of your Kafka cluster and handles the distribution of schemas to the producer and consumer by storing a copy of schema in its local cache. Schema Registry Architecture.
Spring framework has great support for testing your Spring application with Apache Kafka.
Apache Avro is a binary serialization format. It relies on schemas (defined in JSON format) that define what fields are present and their type. Nested fields are supported as well as arrays. Avro supports schema evolutivity: you can have multiple versions of your schema, by adding or removing fields.
If you have keys and values in Avro format the following lines should do the trick for you,
config.put("key.converter.schema.registry.url", "http://localhost:8081");
config.put("value.converter.schema.registry.url", "http://localhost:8081");
If this doesn't seem to work you can override Serdes
explicitly. For example, if you have Avro keys:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With