Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Flask Pika Consumer (RabbitMQ)

I have two little

Python Flask

apps

  1. Appone --> Producer
  2. Apptwo --> Consumer

Both are in different docker-container and orchestrated by docker-compose

I dont get the Data from the Producer to the Consumer...Even when I start in apptwo the start.consuming() the Producer cant send any Data to the RabbitMQ Broker Maybe someone can help me. Thank you very much

docker-compose:

version: '3'
services:

  appone:
    container_name: appone
    restart: always
    build:
      context: ./appone
      dockerfile: Dockerfile
    environment:
      FLASK_APP: ./app.py        
    volumes:
      - './appone:/code/:cached'
    ports:
      - "5001:5001"

  apptwo:
    container_name: apptwo
    restart: always
    build:
      context: ./apptwo
      dockerfile: Dockerfile
    environment:
      FLASK_DEBUG: 1
      FLASK_APP: ./app.py         
    volumes:
      - ./apptwo:/code:cached 
    ports:
      - "5002:5002"     

  rabbitmq:
    image: "rabbitmq:3-management"
    hostname: "rabbit"
    ports:
      - "15672:15672"
      - "5672:5672"
    labels:
      NAME: "rabbitmq"
    volumes:
      - ./rabbitmq/rabbitmq-isolated.conf:/etc/rabbitmq/rabbitmq.config

appone (Producer)

from flask import Flask
from flask_restful import Resource, Api
import pika

app = Flask(__name__)
api = Api(app)

app.config['DEBUG'] = True

message = "Hello World, its me appone"


class HelloWorld(Resource):
    def get(self):
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='rabbitmq'))
        channel = connection.channel()

        channel.queue_declare(queue='hello', durable=True)

        channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2))

        connection.close()

        return {'message': message}


api.add_resource(HelloWorld, '/api/appone/post')

if __name__ == '__main__':
    # Development
    app.run(host="0.0.0.0", port=5001)

apptwo (Consumer)

from flask import Flask
from flask_restful import Resource, Api
import pika
from threading import Thread

app = Flask(__name__)
api = Api(app)

app.config['DEBUG'] = True

data = []

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='rabbitmq'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    data.append(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

thread = Thread(channel.start_consuming())
thread.start()

class HelloWorld(Resource):
    def get(self):
        return {'message': data}

api.add_resource(HelloWorld, '/api/apptwo/get')

if __name__ == '__main__':
    app.run(debug=True, host="0.0.0.0", port=5002)

Goal In this easy example I just want to receice the data in apptwo and store it in the data list...

Thanks again!!

like image 911
meai2312 Avatar asked Jan 26 '23 22:01

meai2312


1 Answers

In apptwo (Consumer):

thread = Thread(channel.start_consuming())
thread.start()

Here the constructor call of Thread is never called, since channel.start_consuming is called before, which is blocking. Changing your code to the following might help.

thread = Thread(target = channel.start_consuming)
thread.start()
like image 135
user2847878 Avatar answered Jan 28 '23 15:01

user2847878