I am trying to use the Kafka Connect JDBC Source Connector with following properties in BULK mode.
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
I get the following error about committing offsets, changing various parameters seems to have little effect.
[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
The flush() call gives a convenient way to ensure all previously sent messages have actually completed. This example shows how to consume from one Kafka topic and produce to another Kafka topic: for(ConsumerRecord<String, String> record: consumer. poll(100)) producer. send(new ProducerRecord("my-topic", record.
The Kafka max message size is 1MB. In this lesson we will look at two approaches for handling larger messages in Kafka. Kafka has a default limit of 1MB per message in the topic. This is because very large messages are considered inefficient and an anti-pattern in Apache Kafka.
buffer. memory represents the total bytes of memory that the producer can use to buffer records waiting to be sent to the server. The default buffer. memory is 32MB. If the producer sends the records faster than they can be delivered to the server, the buffer.
The error indicates that there are a lot of messages buffered and cannot be flushed before the timeout is reached.
To address this issue you can
offset.flush.timeout.ms
configuration parameter in your Kafka Connect Worker Configsproducer.buffer.memory
in your Kafka Connect Worker Configs. This turns to be the best option when you have fairly large messages.When security.protocol=SSL
is enabled make sure that there are separate SSL parameters for Connect workers and Connect producers.
Provide SSL settings for both
# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234
# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234
see https://docs.confluent.io/5.2.3/connect/security.html#separate-principals
If you're trying to connect with confluent cloud this error is probably because a missing configuration in the worker properties, make sure You added the producer and consumer configuration.
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
consumer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
producer.security.protocol=SASL_SSL
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With