I am using Kafka Connect to get messages from a Kafka Broker (v0.10.2) and then sync it to a downstream service.
Currently, I have code in SinkTask#put
that will process the SinkRecord
& then persist it to the downstream service.
A couple of key requirements,
So we thought we can rely on SinkTask#flush
to effectively back out of committing offsets for that particular poll/cycle of received messages by throwing an exception or something that will tell Connect not to commit the offsets, but retry in the next poll.
But as we found out flush
is actually time-based & is more or less independent of the polls & it will commit the offsets when it reaches a certain time threshold.
In 0.10.2 SinkTask#preCommit
was introduced, so we thought we can use it for our purposes. But nowhere in the documentation it is mentioned that there is a 1:1 relationship between SinkTask#put
& SinkTask#preCommit
.
Since essentially we want to commit offsets
as soon as a single put succeeds
. And similarly, not commit the offsets, if that particular put
failed.
How to accomplish this, if not via SinkTask#preCommit
?
Getting data into and out of Kafka correctly can be challenging, and Kafka Connect makes this easier since it uses best practices and hides many of the complexities. For sink connectors, Kafka Connect reads messages from a topic, sends them to your connector, and then periodically commits the largest offsets for the various topic partitions that have been read and processed.
Note that "sending them to your connector" corresponds to the put(Collection<SinkRecord>)
method, and this may be called many times before Kafka Connect commits the offsets. You can control how frequently Kafka Connect commits offsets, but Kafka Connect ensures that it will only commit an offset for a message when that message was successfully processed by the connector.
When the connector is operating nominally, everything is great and your connector sees each message once, even when the offsets are committed periodically. However, should the connector fail, then when it restarts the connector will start at the last committed offset. That might mean your connector sees some of the same messages that it processed just before the crash. This usually is not a problem if you carefully write your connector to have at least once semantics.
Why does Kafka Connect commit offsets periodically rather than with every record? Because it saves a lot of work and doesn't really matter when things are going nominally. It's only when things go wrong that the offset lag matters. And even then, if you're having Kafka Connect handle offsets your connector needs to be ready to handle messages at least once. Exactly once is possible, but your connector has to do more work (see below).
Writing Records
You have a lot of flexibility in writing a connector, and that's good because a lot will depend on the capabilities of the external system to which it's writing. Let's look at different ways of implementing put
and flush
.
If the system supports transactions or can handle a batch of updates, your connector's put(Collection<SinkRecord>)
could write all of the records in that collection using a single transaction / batch, retrying as many times as needed until the transaction / batch completes or before finally throwing an error. In this case, put
does all the work and will either succeed or will fail. If it succeeds, then Kafka Connect knows all of the records were handled properly and can thus (at some point) commit the offsets. If your put
call fails, then Kafka Connect assumes doesn't know whether any of the records were processed, so it doesn't update its offsets and it stops your connector. Your connector's flush(...)
would need to do nothing, since Kafka Connect is handling all the offsets.
If the system doesn't support transactions and instead you can only submit items one at a time, you might have have your connector's put(Collection<SinkRecord>)
attempt to write out each record individually, blocking until it succeeds and retrying each as needed before throwing an error. Again, put
does all the work, and the flush
method might not need to do anything.
So far, my examples do all the work in put
. You always have the option of having put
simply buffer the records and to instead do all the work of writing to the external service in flush
or preCommit
. One reason you might do this is so that you're writes are time-based just like flush
and preCommit
. If you don't want your writes to be time-based, you probably don't want to do the writes in flush
or preCommit
.
To Record Offsets or Not To Record
As mentioned above, by default Kafka Connect will periodically record the offsets so that upon restart the connector can begin where it last left off.
However, sometimes it is desirable for a connector to record the offsets in the external system, especially when that can be done atomically. When such a connector starts up, it can look in the external system to find out the offset that was last written, and can then tell Kafka Connect where it wants to start reading. With this approach your connector may be able to do exactly once processing of messages.
When sink connectors do this, they actually don't need Kafka Connect to commit any offsets at all. The flush
method is simply an opportunity for your connector to know which offsets that Kafka Connect is committing for you, and since it doesn't return anything it can't modify those offsets or tell Kafka Connect which offsets the connector is handling.
This is where the preCommit
method comes in. It really is a replacement for flush
(it actually takes the same parameters as flush
), except that it is expected to return the offsets that Kafka Connect should commit. By default, preCommit
just calls flush
and then returns the same offsets that were passed to preCommit
, which means Kafka Connect should commit all the offsets it passed to the connector via preCommit
. But if your preCommit
returns an empty set of offsets, then Kafka Connect will record no offsets at all.
So, if your connector is going to handle all offsets in the external system and doesn't need Kafka Connect to record anything, then you should override the preCommit
method instead of flush
, and return an empty set of offsets.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With