I am building a microservice component which will consume by default Spring Cloud Stream (SCS) Kafka messages generated by other (SCS) components.
But I also have a requirement to consume Kafka messages from other components that are using the confluent API.
I have an example repository that shows what I'm trying to do.
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
This is the application configuration below with the SCS input binding and the confluent input binding.
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
With the above configuration I get both consumers created with the SCS default configuration For instance the class org.apache.kafka.common.serialization.ByteArrayDeserializer is the value deserializer for both input bindings.
If I remove the comments in the above configuration I get both consumers with the configuration being sent from my Confluent client For instance the class io.confluent.kafka.serializers.KafkaAvroDeserializer is the value deserializer for both input bindings.
I understand because the configuration is on the Kafka binder it will apply to all the consumers defined with that binder.
Is there any way that I can define those specific properties so that they will apply for only the confluent specific consumer binding and all the other input binding can use the default SCS config?
You can set binding-specific consumer and producer properties via the configuration
property.
See the reference manual.
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz
When using non-standard serializers/deserializers you must set useNativeEncoding
and useNativeDecoding
for producers and consumers respectively. Again, see the reference manual.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With