Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pika, stop_consuming does not work

I'm new to rabbitmq and pika, and is having trouble with stopping consuming.

channel and queue setting:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

Basically, consumer and producer are like this:

consumer:

def task(task_id):
    def callback(channel, method, properties, body):
        if body != "quit":
            print(body)
        else:
            print(body)
            channel.stop_consuming(task_id)

    channel.basic_consume(callback, queue=task_id, no_ack=True)
    channel.start_consuming()
    print("finish")
    return "finish"

producer:

proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None:  # running
    line = proc.stdout.readline()
    if line:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body=line
        )
    else:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body="quit"
        )
        break

consumer task gave me output:

# ... output from sample.sh, as expected

quit
�}q(UstatusqUSUCCESSqU  tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.

However, "finish" didn't get printed, so I'm guessing it's because channel.stop_consuming(task_id) didn't stop consuming. If so, what is the correct way to do? Thank you.

like image 288
laike9m Avatar asked Dec 23 '14 16:12

laike9m


1 Answers

I have a class defined with member variables of channel and connection. These are initialized by a seperate thread. The consumer of MyClient Class uses the close() method and the the connection and consumer is stopped!

class MyClient:

    def __init__(self, unique_client_code):
        self.Channel = None
        self.Conn: pika.BlockingConnection = None  
        self.ClientThread = self.init_client_driver()

    def _close_callback(self):
        self.Channel.stop_consuming()
        self.Channel.close()
        self.Conn.close()
    def _client_driver_thread(self, tmout=None):
        print("Starting Driver Thread...")
        self.Conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.Channel = self.Conn.channel()

    def init_client_driver(self, tmout=None):
        kwargs = {'tmout': tmout}
        t = threading.Thread(target=self._client_driver_thread, kwargs=kwargs)
        t.daemon = True
        t.start()
        return t

    def close(self):
        self.Conn.add_callback_threadsafe(self._close_callback)
        self.ClientThread.join()
like image 80
bnhede Avatar answered Sep 25 '22 19:09

bnhede