We've built a pipeline that reads from a topic and does a groupBy on a different field.
input
.groupBy(
(key, value) -> value.getFieldA(),
Grouped.with("TopicName", Serdes.String(), Serdes.Integer()))
.windowedBy(SessionWindows.with(ofMinutes(5)).grace(Duration.ZERO))
This step creates an intermediate app-TopicName-repartition topic. However, KStream is constantly sending Delete requests to Kafka. We can see logs in Kafka side:
INFO [DENY] Auth request Delete on Topic:app-TopicName-repartition by User test_user (cached) (io.aiven.kafka.auth.AivenAclAuthorizer)
There is no streams.cleanUp() or manual deletion process in our code via admin API. The delete requests are only for repartition topic not for other intermediate topics. The app works perfectly fine. It just keeps sending background delete requests because I've set retries to Integer.MAX_VALUE. I failed to debug the issue. Why is KStream issuing a delete request for a repartition topic?
[UPDATE]
As far as I can trace on KStreams source code, it calls KafkaAdminClient.deleteRecords() in TaskManager. Is this the reason, I see Delete in log files? There is no other call in KStreams source code that explicitly deletes a topic.
That is correct. Kafka Streams does never try to delete a topic. However, you need allow it to purge data from repartition topics. Note that repartition topics are by default configured with infinite retention time and would grow unbounded if Kafka Streams cannot purge the topic.
For more details on what ACLs you need check out the docs: https://docs.confluent.io/current/streams/developer-guide/security.html#required-acl-setting-for-secure-ak-clusters
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With