Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add a failure callback for kafka-python kafka.KafkaProducer#send()?

Tags:

kafka-python

I would like to set a callback to be fired if a produced records fail. Initially, I would just like to log the failed record.

The Confluent Kafka python library provides a mechanism for adding a callback:

produce(topic[, value][, key][, partition][, on_delivery][, timestamp])
...
    on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

How can I achieve similar behaviour with kafka-python kafka.KafkaProducer#send() without having to use the deprecated SimpleClient using kafka.SimpleClient#send_produce_request()

like image 472
Chris Snow Avatar asked Sep 24 '17 08:09

Chris Snow


1 Answers

Although it isn't documented, this is relatively straightforward. Whenever you send a message, you immediately get a Future back. You can append callbacks/errback's to that Future:

F = producer.send(topic=topic, value=message, key=key)
F.add_callback(callback, message=message, **kwargs_to_pass_to_callback_method)
F.add_errback(erback, message=message, **kwargs_to_pass_to_errback_method)

Relevant source code here: https://github.com/dpkp/kafka-python/blob/1937ce59b4706b44091bb536a9b810ae657c3225/kafka/future.py#L48-L64

We really should document this, I filed https://github.com/dpkp/kafka-python/issues/1256 to track it.

like image 82
Jeff Widman Avatar answered Oct 18 '22 22:10

Jeff Widman