Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flask + RabbitMQ + SocketIO - forwarding messages

I'm facing problem in emiting messages from RabbitMQ to User via SocketIO.

I have Flask application with SocketIO integration. Current user flow seems like this

The problem is i'm not able to set up RabbitMQ listener which forward messages to browser via SocketIO. Every time i'm getting different error. Mostly is that connection is closed, or i'm working outside of application context.

I tried many approaches, here is my last one.

# callback 
def mq_listen(uid):
    rabbit = RabbitMQ()
    def cb(ch, method, properties, body, mq=rabbit):
        to_return = [0]  # mutable
        message = Message.load(body)
        to_return[0] = message.get_message()

        emit('report_part', {"data": to_return[0]})

    rabbit.listen('results', callback=cb, id=uid)

# this is the page, which user reach
@blueprint.route('/report_result/<uid>', methods=['GET'])
def report_result(uid):

    thread = threading.Thread(target=mq_listen, args=(uid,))
    thread.start()

     return render_template("property/report_result.html", socket_id=uid)

where rabbit.listen method is abstraction like:

def listen(self, queue_name, callback=None, id=None):
    if callback is not None:
        callback_function = callback
    else:
        callback_function = self.__callback
    if id is None:
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_qos(prefetch_count=1)
        self.consumer_tag = self.channel.basic_consume(callback_function, queue=queue_name)
        self.channel.start_consuming()
    else:
        self.channel.exchange_declare(exchange=queue_name, type='direct')
        result = self.channel.queue_declare(exclusive=True)
        exchange_name = result.method.queue
        self.channel.queue_bind(exchange=queue_name, queue=exchange_name, routing_key=id)
        self.channel.basic_consume(callback_function, queue=exchange_name, no_ack=True)
        self.channel.start_consuming()

which resulted into

RuntimeError: working outside of request context

I will be happy for any tip or example of usage.

Thanks a lot

like image 948
Michal Hatak Avatar asked Apr 05 '15 18:04

Michal Hatak


1 Answers

I had a similar issue, in the end of the day it's because when you make a request flask passes the request context to client. But the solution is NOT to add with app.app_context(). That is hackey and will definitely have errors as you're not natively sending the request context.

My solution was to create a redirect so that the request context is maintained like:

def sendToRedisFeed(eventPerson, type):
    eventPerson['type'] = type
    requests.get('http://localhost:5012/zmq-redirect', json=eventPerson)

This is my redirect function, so whenever there is an event I'd like to push to my PubSub it goes through this function, which then pushes to that localhost endpoint.

from flask_sse import sse

app.register_blueprint(sse, url_prefix='/stream')

@app.route('/zmq-redirect', methods=['GET'])
def send_message():
    try:
        sse.publish(request.get_json(), type='greeting')
        return Response('Sent!', mimetype="text/event-stream")

    except Exception as e:
        print (e)
        pass

Now, whenever an event is pushed to my /zmq-redirect endpoint, it is redirected and published via SSE.

And now finally, just to wrap everything up, the client:

var source = new EventSource("/stream");

source.addEventListener(
  "greeting",
  function(event) {
    console.log(event)
  }
)
like image 111
esteininger Avatar answered Sep 28 '22 01:09

esteininger