Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - Simplest Way to Get Latest Offset

Tags:

apache-kafka

I'm building an application that allows subscriptions to kafka topics to be added and removed dynamically. When a topic subscription is added I wanted to run a batch job every hour that gets all of the new messages and pushes them into another datastore.

What I want to understand is how to get the current offset of a topic. As soon as a subscription is added, I want the next batch job to get all messages since the approximate time of the subscription.

As an example, imagine I have a topic called "TopicA" which is constantly receiving messages. If I add a subscription at 7.15pm, when the batch job runs at 8pm I want all messages since 7.15pm to be batched up. I'm happy for the time to be approximate - 7.10, 7.20 etc. 5 or 10 minutes either side causes me no concern.

So my intended solution is to get the current offset of a topic the moment a subscription is added. I've looked at the simple consumer, but I don't want to get involved in all of the cluster managemnet aspects for this basic use case.

I've also looked at the high-level consumer. I could something like this:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset

What concerns me with this approach is the call to "head" is actually a stream. So I believe it will block waiting for the next message. Blocking is problematic because it may cause other subscriptions to be queued up until the next message arrives.

I'm happy to spend some time implementing the latter approach, but if there is a simpler way that doesn't require me to write error-prone concurrent code, then I'd rather not waste my time.

I'll also need a way to get all logs since that offset.

like image 490
user2668128 Avatar asked Nov 20 '14 16:11

user2668128


1 Answers

Every response to a fetch request returns a "HighWaterMark" which represents the latest offset in the log of the partition currently being consumed. So in theory you could fetch the earliest message or indeed any message (assuming one exists) for a given topic, and pull the HighWaterMark from the response. There's more detail on the HighWaterMark here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

Of course, being able to pull the HighWaterMarkOffset from the response depends on your client making that data available through its own Kafka API.

like image 87
David Corley Avatar answered Nov 24 '22 09:11

David Corley