Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream Exception: GroupAuthorizationException

I'm developing a Kafka-Stream application, which will read the message from input Kafka topic and filter unwanted data and push to output Kafka topic.

Kafka Stream Configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}

KStream Filter Logic:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
    KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

    filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
    /** After filtering printing the same message */
    filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
    return stream;
}

While starting above spring based Kafka Stream application, i was getting below exception.

2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

Our Kafka Infra team given necessary permission to "group.id", using this same "group id" i can consume the message using other Kafka Consumer applications and I was using name as per my wish in "application.id". We are not adding/updating "application.id" in Kafka Access Control List.

Am really not sure we need to give any permission for "application.id" or am missing something in the Kafka Stream Configuration. Please advice.

Please Note: I have tried with using with "group.id" and without "group.id" in Kafka Stream Configuration, all the time i am getting same exception.

Thanks! Bharathiraja Shanmugam

like image 567
Bharathiraja S Avatar asked May 27 '19 17:05

Bharathiraja S


1 Answers

I am not at my desk but I think Streams sets the group.id to application.id.

like image 77
Gary Russell Avatar answered Oct 03 '22 04:10

Gary Russell