I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). When I run the following piece of code, it will run all the time, even if all messages consumed. I hope the consumer will stop if it consume all the messages. How to do it? Also I have no idea how to use stop() function (which is in base class kafka.consumer.base.Consumer).
UPDATE
I used signal handler to call consumer.stop(). Some error messages were printed out to the screen. But the program still was stuck in the for-loop. When new messages came in, the consumer consumed them and printed them. I also tried client.close(). But the same result.
I need some ways to stop the for-loop gracefully.
        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")
        consumer.seek(0, 2)# (0,2) and (0,0)
        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value
Any help is welcome. Thanks.
We can first check the offset of the last message in the topic. Then stop the loop when we have reached that offset.
    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])
    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)
    consumer.seek_to_beginning(tp)        
    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With