I'm developing a Kafka-Stream application, which will read the message from input Kafka topic and filter unwanted data and push to output Kafka topic.
Kafka Stream Configuration:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
KStream Filter Logic:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));
filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
/** After filtering printing the same message */
filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
return stream;
}
While starting above spring based Kafka Stream application, i was getting below exception.
2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0
Our Kafka Infra team given necessary permission to "group.id", using this same "group id" i can consume the message using other Kafka Consumer applications and I was using name as per my wish in "application.id". We are not adding/updating "application.id" in Kafka Access Control List.
Am really not sure we need to give any permission for "application.id" or am missing something in the Kafka Stream Configuration. Please advice.
Please Note: I have tried with using with "group.id" and without "group.id" in Kafka Stream Configuration, all the time i am getting same exception.
Thanks! Bharathiraja Shanmugam
I am not at my desk but I think Streams sets the group.id to application.id.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With