Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka-connect sink task ignores file offset storage property

I'm experiencing quite weird behavior working with Confluent JDBC connector. I'm pretty sure that it's not related to Confluent stack, but to Kafka-connect framework itself.

So, I define offset.storage.file.filename property as default /tmp/connect.offsets and run my sink connector. Obviously, I expect connector to persist offsets in the given file (it doesn't exist on file system, but it should be automatically created, right?). Documentation says:

offset.storage.file.filename The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off.

But Kafka behaves in completely different manner.

  1. It checks if the given file exists.
  2. It it's not, Kafka just ignores it and persists offsets in Kafka topic.
  3. If I create given file manually, reading fails anyway (EOFException) and offsets are being persisted in topic again.

Is it a bug or, more likely, I don't understand how to work with this configurations? I understand difference between two approaches to persist offsets and file storage is more convenient for my needs.

like image 611
bsiamionau Avatar asked Feb 06 '17 16:02

bsiamionau


People also ask

Where does Kafka Connect store offsets?

filename : The storage file name for connector offsets. This file is stored on the local filesystem in standalone mode. Using the same file name for two workers will cause offset data to be deleted or overwritten with different values.

What is a sink in Kafka connect?

The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.

What is Kafka connect offset?

Kafka Connect in distributed mode uses Kafka itself to persist the offsets of any source connectors. This is a great way to do things as it means that you can easily add more workers, rebuild existing ones, etc without having to worry about where the state is persisted.

Does Kafka Connect store data?

Source connectors can also collect metrics from all your application servers and store the data in Kafka topics–making the data available for stream processing with low latency.


2 Answers

The offset.storage.file.filename is only used in source connectors. It is used to place a bookmark on the input data source and remember where it stopped reading it. The created file contains something like the file line number (for a file source) or a table row number (for jdbc source or databases in general).

When running Kafka Connect in distributed mode, this file is replaced by a Kafka topic named by default connect-offsets which should be replicated in order to tolerate failures.

As far as sink connectors are concerned, no matter which plugin or mode (standalone/distributed) is used, they all store where they last stopped reading their input topic in an internal topic named __consumer_offsets like any Kafka consumers. This allows to use traditionnal tools like kafka-consumer-groups.sh command-line tools how the much the sink connector is lagging.

The Confluent Kafka replicator, despite being a source connector, is probably an exception because it reads from a remote Kafka and may use a Kafka consumer.

I agree than documentation is not clear, this setting is required whatever the connector type is (source or sink), but it is only used on by source connectors. The reason behind this design decision is that a single Kafka Connect worker (I mean a single JVM process) can run multiple connectors, potentially both source and sink connectors. Said differently, this setting is worker level setting, not a connector setting.

like image 159
G Quintana Avatar answered Sep 23 '22 00:09

G Quintana


The property offset.storage.file.filename only applies to workers of source connectors running in standalone mode. If you are seeing Kafka persist offsets in a Kafka topic for a source, you are running in distributed mode. You should be launching your connector with the provided script connect-standalone. There's a description of the different modes here. Instructions on running in the different modes are here.

like image 42
dawsaw Avatar answered Sep 21 '22 00:09

dawsaw