Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BufferError: Local: Queue full in Python

import logging
from confluent_kafka import Producer
import os

logger = logging.getLogger("main")

BOOTSTRAP_SERVERS = os.environ['BOOTSTRAP_SERVERS']
APPLICATION_ID = os.getenv('APPLICATION_ID', default = "nke-data-source")
RECONNECT_BACKOFF_MS = os.getenv('RECONNECT_BACKOFF_MS', default = 1000)
REQUEST_TIMEOUT_MS = os.getenv('REQUEST_TIMEOUT_MS', default = 40000)
ACKS = os.getenv('ACKS', default = "all")
RETRIES = os.getenv('RETRIES', default = 15)
RETRY_BACK_OFF = os.getenv('RETRY_BACK_OFF', default = 1000)
MAX_IN_FLIGHT_REQUESTS = os.getenv('MAX_IN_FLIGHT_REQUESTS', default = 1)
topic = os.getenv('OUTBOUND_TOPIC', default = "tti-nke-raw")

p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS, 
    'client.id': APPLICATION_ID, 
    'reconnect.backoff.ms': RECONNECT_BACKOFF_MS,
    'request.timeout.ms': REQUEST_TIMEOUT_MS,
    'acks': ACKS,
    'retries': RETRIES,
    'retry.backoff.ms': RETRY_BACK_OFF,
    'max.in.flight.requests.per.connection': MAX_IN_FLIGHT_REQUESTS,
    'compression.type': "lz4"})

def send(key, event):
    try:
        logger.info("Sending key: [{0}] value: [{1}]".format(key, event))
        p.produce(topic=topic, value=event.encode('utf-8'), key=key)
    except Exception:
        logger.error("error sending events to kafka", exc_info=True)

Error:-

Traceback (most recent call last):
BufferError: Local: Queue full
File "/app/sender.py", line 30, in send
p.produce(topic=topic, value=event.encode('utf-8'), key=key)

Can anyone help me in this as I'am new in python

like image 537
Arpit Jain Avatar asked Mar 03 '23 08:03

Arpit Jain


1 Answers

This Queue is somthing implemented in the librdkafka library (which confluent_kafka is binding to)

There is an inner Queue for the produce that take the producer delivery report and wait for the produce to deal with them (mostly doing nothing), but you need to trigger this machnicesm of going thru the queue, which can be simple called by calling poll

You should call producer.poll(0) after every producing so change:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)

into:

p.produce(topic=topic, value=event.encode('utf-8'), key=key)
p.poll(0)

Which will trigger the queue cleaning, dont worry about performance cause this is a very simple function that doesn't really do much as the author of librdkafka wrote:

poll() is cheap to call, it will not have a performance impact, so please add it to your producer loop.

basicly what does it do:

call poll() at regular intervals to serve the producer's delivery report callbacks.

consider read about this in this Issue too

like image 144
Reznik Avatar answered Mar 15 '23 20:03

Reznik