I'm new to rabbitmq and pika, and is having trouble with stopping consuming.
channel and queue setting:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)
Basically, consumer and producer are like this:
consumer:
def task(task_id):
def callback(channel, method, properties, body):
if body != "quit":
print(body)
else:
print(body)
channel.stop_consuming(task_id)
channel.basic_consume(callback, queue=task_id, no_ack=True)
channel.start_consuming()
print("finish")
return "finish"
producer:
proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None: # running
line = proc.stdout.readline()
if line:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body=line
)
else:
channel.basic_publish(
exchange='',
routing_key=self.request.id,
body="quit"
)
break
consumer task
gave me output:
# ... output from sample.sh, as expected
quit
�}q(UstatusqUSUCCESSqU tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.
However, "finish"
didn't get printed, so I'm guessing it's because channel.stop_consuming(task_id)
didn't stop consuming. If so, what is the correct way to do? Thank you.
I have a class defined with member variables of channel and connection. These are initialized by a seperate thread. The consumer of MyClient Class uses the close() method and the the connection and consumer is stopped!
class MyClient:
def __init__(self, unique_client_code):
self.Channel = None
self.Conn: pika.BlockingConnection = None
self.ClientThread = self.init_client_driver()
def _close_callback(self):
self.Channel.stop_consuming()
self.Channel.close()
self.Conn.close()
def _client_driver_thread(self, tmout=None):
print("Starting Driver Thread...")
self.Conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
self.Channel = self.Conn.channel()
def init_client_driver(self, tmout=None):
kwargs = {'tmout': tmout}
t = threading.Thread(target=self._client_driver_thread, kwargs=kwargs)
t.daemon = True
t.start()
return t
def close(self):
self.Conn.add_callback_threadsafe(self._close_callback)
self.ClientThread.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With