Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make kafka-python or pykafka work as an async producer with uwsgi and gevent?

My Stack is uwsgi with gevents. I am trying to wrap my api end points with a decorator to push all request data (url, method, body and response) to a kafka topic, But its not working. My theory is because I am using gevents, and I am trying to run these in async mode, the async thread which actually pushes to kafka, is not able to run with gevents. And If I try to make the method sync, Then also it does not work, it dies in the produce worker, i.e. after produce the call never returns. Although both the methods work good on python shell and if I run uwsgi on threads.

Follows the sample code: 1. with kafka-python (async)

    try:
        kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
    except NoBrokersAvailable:
        logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
        kafka_producer = None


    def send_message_to_kafka(topic, key, message):
        """
        :param topic: topic name
        :param key: key to decide partition
        :param message: json serializable object to send
        :return:
        """
        if not kafka_producer:
            logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
            return
        data = json.dumps(message)
        try:
            start = time.time()
            kafka_producer.send(topic, key=str(key), value=data)
            logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
        except KafkaTimeoutError as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.info(e)
            pass
        except Exception as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.exception(e)
            pass
  1. with py-kafka (sync):

    try:
        client = KafkaClient(hosts=KAFKAHOST)
    except Exception as e:
        logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST))
        client = None
    
    
    def send_message_to_kafka(topic, key, message):
        """
        :param topic: topic name
        :param key: key to decide partition
        :param message: json serializable object to send
        :return:
        """
        if not client:
            logger.info(u'Kafka Host is None')
            return
        data = json.dumps(message)
        try:
            start = time.time()
            topic = client.topics[topic]
            with topic.get_sync_producer() as producer:
                producer.produce(data, partition_key='{}'.format(key))
            logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
        except Exception as e:
            logger.exception(e)
            pass
    
like image 965
Harsh M Avatar asked Jun 08 '16 04:06

Harsh M


People also ask

How do you make a producer synchronous in Kafka?

Default Kafka producer send API is asynchronous and nonblocking. When you call the send API, it merely adds the ProducerRecord into the buffer and returns immediately. Asynchronous and non-blocking send is efficient.

Is Kafka producer asynchronous?

The Kafka Producer has a send() method which is asynchronous. Calling the send method adds the record to the output buffer and return right away. The buffer is used to batch records for efficient IO and compression. The Kafka Producer configures acks to control record durability.

What does Kafka do?

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data.


1 Answers

I have more experience with pykafka so I can answer that section. pykafka uses a pluggable thread handler and gevent support is built in. You need to instantiate the KafkaClient with use_greenlets=True. Docs here

Other thoughts on your approach. Creating a new topic object and producer for every message is extremely expensive. Its better to create once and reuse.

# setup once
client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True)
topic = client.topics[topic]
producer = topic.get_sync_producer() 

def send_message_to_kafka(producer, key, message):
    """
    :param producer: pykafka producer
    :param key: key to decide partition
    :param message: json serializable object to send
    :return:
    """

    data = json.dumps(message)
    try:
        start = time.time()
        producer.produce(data, partition_key='{}'.format(key))
        logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
    except Exception as e:
        logger.exception(e)
        pass # for at least once delivery you will need to catch network errors and retry.

Finally, kafka gets all of it's speed from batching and compression. Using the sync producer keeps the client from exploiting these features. It will work, but is slower and uses more space. Some applications require sync but it might make sense to rethink your application to batch messages if you are hitting performance bottlenecks.

like image 115
jdennison Avatar answered Oct 06 '22 04:10

jdennison