I'm facing problem in emiting messages from RabbitMQ to User via SocketIO.
I have Flask application with SocketIO integration. Current user flow seems like
The problem is i'm not able to set up RabbitMQ listener which forward messages to browser via SocketIO. Every time i'm getting different error. Mostly is that connection is closed, or i'm working outside of application context.
I tried many approaches, here is my last one.
# callback
def mq_listen(uid):
rabbit = RabbitMQ()
def cb(ch, method, properties, body, mq=rabbit):
to_return = [0] # mutable
message = Message.load(body)
to_return[0] = message.get_message()
emit('report_part', {"data": to_return[0]})
rabbit.listen('results', callback=cb, id=uid)
# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):
thread = threading.Thread(target=mq_listen, args=(uid,))
thread.start()
return render_template("property/report_result.html", socket_id=uid)
where rabbit.listen method is abstraction like:
def listen(self, queue_name, callback=None, id=None):
if callback is not None:
callback_function = callback
else:
callback_function = self.__callback
if id is None:
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_qos(prefetch_count=1)
self.consumer_tag = self.channel.basic_consume(callback_function, queue=queue_name)
self.channel.start_consuming()
else:
self.channel.exchange_declare(exchange=queue_name, type='direct')
result = self.channel.queue_declare(exclusive=True)
exchange_name = result.method.queue
self.channel.queue_bind(exchange=queue_name, queue=exchange_name, routing_key=id)
self.channel.basic_consume(callback_function, queue=exchange_name, no_ack=True)
self.channel.start_consuming()
which resulted into
RuntimeError: working outside of request context
I will be happy for any tip or example of usage.
Thanks a lot
I had a similar issue, in the end of the day it's because when you make a request flask passes the request context to client. But the solution is NOT to add with app.app_context(). That is hackey and will definitely have errors as you're not natively sending the request context.
My solution was to create a redirect so that the request context is maintained like:
def sendToRedisFeed(eventPerson, type):
eventPerson['type'] = type
requests.get('http://localhost:5012/zmq-redirect', json=eventPerson)
This is my redirect function, so whenever there is an event I'd like to push to my PubSub it goes through this function, which then pushes to that localhost endpoint.
from flask_sse import sse
app.register_blueprint(sse, url_prefix='/stream')
@app.route('/zmq-redirect', methods=['GET'])
def send_message():
try:
sse.publish(request.get_json(), type='greeting')
return Response('Sent!', mimetype="text/event-stream")
except Exception as e:
print (e)
pass
Now, whenever an event is pushed to my /zmq-redirect endpoint, it is redirected and published via SSE.
And now finally, just to wrap everything up, the client:
var source = new EventSource("/stream");
source.addEventListener(
"greeting",
function(event) {
console.log(event)
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With