Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Pika - Consumer into Thread

I'm working on a Python app with a background thread for consuming message from a RabbitMQ Queue (topic scenario).

I start the thread on on_click event of a Button. Here is my code, please take attention on "#self.receive_command()".

def on_click_start_call(self,widget):


    t_msg = threading.Thread(target=self.receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()


def receive_command(self):

    syslog.syslog("ENTERED")

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    syslog.syslog("1")

    channel = connection.channel()
    syslog.syslog("2")

    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    syslog.syslog("3")

    result = channel.queue_declare(exclusive=True)
    syslog.syslog("4")

    queue_name = result.method.queue
    syslog.syslog("5")

    def callback_rabbit(ch,method,properties,body):
        syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")

    syslog.syslog("6")

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    syslog.syslog("7")

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    syslog.syslog("8")

    channel.start_consuming()

If i run this code, i can't see on syslog the message 1,2,3,5,6,7,8 But i can see only "ENTERED". So, the code is locked on pika.BlokingConnection.

If i run the same code(commenting the thread instruction and decommenting the direct call to function), all works as espected and message are correctly received.

There are any solutions to run a consumer into a thread?

Thanks in Advance

Davide

like image 905
user2056899 Avatar asked Dec 27 '22 04:12

user2056899


1 Answers

I have tested the code on my machine, with the latest version of Pika. It works fine. There are threading issues with Pika, but as long as you create one connection per thread it shouldn't be a problem.

If you are experiencing issues, it is most likely because of a bug in an older version of Pika, or an unrelated issues with your threading causing an issue.

I would recommend that you avoid 0.9.13 as there are multiple bugs, but 0.9.14 0.10.0 should be released very soon™.

[Edit] Pika 0.9.14 has been released.

This is the code I used.

def receive_command():
    print("ENTERED")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    print("1")
    channel = connection.channel()
    print("2")
    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    print("3")
    result = channel.queue_declare(exclusive=True)
    print("4")
    queue_name = result.method.queue
    print("5")
    def callback_rabbit(ch,method,properties,body):
        print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")
    print("6")
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    print("7")
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    print("8")
    channel.start_consuming()

def start():
    t_msg = threading.Thread(target=receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()
start()
like image 87
eandersson Avatar answered Jan 12 '23 10:01

eandersson