I would like to set a callback to be fired if a produced records fail. Initially, I would just like to log the failed record.
The Confluent Kafka python library provides a mechanism for adding a callback:
produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
How can I achieve similar behaviour with kafka-python kafka.KafkaProducer#send() without having to use the deprecated SimpleClient using kafka.SimpleClient#send_produce_request()
Although it isn't documented, this is relatively straightforward. Whenever you send a message, you immediately get a Future
back. You can append callbacks/errback's to that Future
:
F = producer.send(topic=topic, value=message, key=key)
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)
Relevant source code here: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64
We really should document this, I filed https://github.com/dpkp/kafka-python/issues/1256 to track it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With