We build a custom Kafka Connect sink which in turn calls a remote REST API. How do I propagate backpressure to the Kafka Connect infrastructure, so put() is called less often in cases when the remote system is slower than the internal consumer delivers messages to put()? The Kafka connect documentation says that we should not block in put(), but block in flush(). But not blocking in put() means that we have to buffer data which surely leads to OOM exceptions at some point, if put() is called more often than flush(). I've seen that a kafka consumer is allowed to call pause() or block in the loop(). Is it possible to leverage this in a kafka connect sink?
A sink connector delivers data from Kafka topics into other systems, which might be indexes such as Elasticsearch, batch systems such as Hadoop, or any kind of database. Some connectors are maintained by the community, while others are supported by Confluent or its partners.
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka® and other data systems. It makes it simple to quickly define connectors that move large data sets in and out of Kafka.
Kafka Stream is the Streams API to transform, aggregate, and process records from a stream and produces derivative streams. Kafka Connect is the connector API to create reusable producers and consumers (e.g., stream of changes from DynamoDB). The Kafka REST Proxy is used to producers and consumer over REST (HTTP).
I've seen that a kafka consumer is allowed to call pause() or block in the loop(). Is it possible to leverage this in a kafka connect sink?
The raw consumer is not exposed, so no. You could call /pause
on the whole connector, though I'm not sure what happens to un-flushed messages at that point.
But not blocking in put() means that we have to buffer data which surely leads to OOM exceptions at some point
It can, sure, but that is really the only viable option for holding onto data for longer than necessary. For instance, this is how the S3 and HDFS connectors work.
rotate.interval.ms
The time interval in milliseconds to invoke file commits...
Your HTTP client connection is likely blocking anyway to make the request, is it not?
The alternative would be to make your HTTP server embed a Kafka consumer so it can poll messages itself and act on them locally rather than need to be sent requests over HTTP.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With