Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value

I have the following configuration for my Kafka Stream application

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

And I got the following error:

Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)

I have tried to replace the line

config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

with

config.put("schema.registry.url","http://localhost:8081");

but with the same error

I have followed the instruction from this url when preparing my Stream application.

Any suggestion?

like image 875
Benny Chan Avatar asked May 15 '18 11:05

Benny Chan


People also ask

Can we use Avro without schema registry?

You can test whether your application has received data from the topic. But if your application uses Apache Avro for serialization, and you do not use Confluent Schema Registry (which is the case when you want to use spring-kafka-test for testing) then you have a problem.

What is schema registry URL?

It is an application that resides outside of your Kafka cluster and handles the distribution of schemas to the producer and consumer by storing a copy of schema in its local cache. Schema Registry Architecture.

Can Kafka work without schema registry?

Spring framework has great support for testing your Spring application with Apache Kafka.

How does Avro schema registry work?

Apache Avro is a binary serialization format. It relies on schemas (defined in JSON format) that define what fields are present and their type. Nested fields are supported as well as arrays. Avro supports schema evolutivity: you can have multiple versions of your schema, by adding or removing fields.


1 Answers

If you have keys and values in Avro format the following lines should do the trick for you,

config.put("key.converter.schema.registry.url", "http://localhost:8081");  
config.put("value.converter.schema.registry.url", "http://localhost:8081");

If this doesn't seem to work you can override Serdes explicitly. For example, if you have Avro keys:

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
like image 198
Giorgos Myrianthous Avatar answered Nov 02 '22 07:11

Giorgos Myrianthous