import logging
from confluent_kafka import Producer
import os
logger = logging.getLogger("main")
BOOTSTRAP_SERVERS = os.environ['BOOTSTRAP_SERVERS']
APPLICATION_ID = os.getenv('APPLICATION_ID', default = "nke-data-source")
RECONNECT_BACKOFF_MS = os.getenv('RECONNECT_BACKOFF_MS', default = 1000)
REQUEST_TIMEOUT_MS = os.getenv('REQUEST_TIMEOUT_MS', default = 40000)
ACKS = os.getenv('ACKS', default = "all")
RETRIES = os.getenv('RETRIES', default = 15)
RETRY_BACK_OFF = os.getenv('RETRY_BACK_OFF', default = 1000)
MAX_IN_FLIGHT_REQUESTS = os.getenv('MAX_IN_FLIGHT_REQUESTS', default = 1)
topic = os.getenv('OUTBOUND_TOPIC', default = "tti-nke-raw")
p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS,
'client.id': APPLICATION_ID,
'reconnect.backoff.ms': RECONNECT_BACKOFF_MS,
'request.timeout.ms': REQUEST_TIMEOUT_MS,
'acks': ACKS,
'retries': RETRIES,
'retry.backoff.ms': RETRY_BACK_OFF,
'max.in.flight.requests.per.connection': MAX_IN_FLIGHT_REQUESTS,
'compression.type': "lz4"})
def send(key, event):
try:
logger.info("Sending key: [{0}] value: [{1}]".format(key, event))
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
except Exception:
logger.error("error sending events to kafka", exc_info=True)
Error:-
Traceback (most recent call last):
BufferError: Local: Queue full
File "/app/sender.py", line 30, in send
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
Can anyone help me in this as I'am new in python
This Queue
is somthing implemented in the librdkafka
library (which confluent_kafka
is binding to)
There is an inner Queue
for the produce that take the producer delivery report and wait for the produce to deal with them (mostly doing nothing), but you need to trigger this machnicesm of going thru the queue, which can be simple called by calling poll
You should call producer.poll(0)
after every producing
so change:
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
into:
p.produce(topic=topic, value=event.encode('utf-8'), key=key)
p.poll(0)
Which will trigger the queue cleaning, dont worry about performance cause this is a very simple function that doesn't really do much as the author of librdkafka wrote:
poll() is cheap to call, it will not have a performance impact, so please add it to your producer loop.
basicly what does it do:
call poll() at regular intervals to serve the producer's delivery report callbacks.
consider read about this in this Issue too
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With