Our use case is to delete stale/unused topics from kafka i.e. if a topic (on all partitions) doesn't have any new message in last 7 days then we would consider it as stale/unused and delete it.
Many google results suggested to add timestamp to messages and then parse it. For new topics & messages that soultion would work but our existing topics & messages doesn't have any timestamp in them.
How can I get this working ?.
kafka.api.OffsetRequest.LatestTime()
will return the latest message added to a queue. You can use the Simple Consumer API
to determine which offset to read from.
For more details take look at the wiki page
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With