Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery as networked pub/sub events

I want to set up a network pub/sub event system but also needs to be able to run tasks asynchronously. I have tried getting celery to do the heavy lifting but I feel like I am trying to shim a whole bunch of things just to get it working.

I have two machines (input and output) and they both have access to RabbitMQ. I would like to have a main program kick off a loop that waits for input (movement detected by a webcam). I have it set up that input_machine starts main.py which starts a celery task that is monitored by a worker on the input_machine subbed to "input" queue. This task just runs a while True loop until some input is detected, which it then calls another named ('project.entered_room' which does nothing) celery task to the "output" queue.

Meanwhile on output_machine, I have a celery instance watching the "output" queue with a task named ('project.entered_room' which responds to someone entering the room).

So when input is detected on input_machine, a task runs on output machine. I can get this to work but run into lots of import issues and other headaches. Is there an easier way to accomplish this? I am going about it all wrong? Am I using the wrong tools?

I have looked into a number of different frameworks including circuits and twisted. Twisted is very complex and I feel like I would be hitting a nail with a jackhammer.

like image 774
Sam Churchill Avatar asked Jun 26 '15 01:06

Sam Churchill


People also ask

What is the difference between Celery and RabbitMQ?

From my understanding, Celery is a distributed task queue, which means the only thing that it should do is dispatching tasks/jobs to others servers and get the result back. RabbitMQ is a message queue, and nothing more. However, a worker could just listen to the MQ and execute the task when a message is received.

How does Celery work internally?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

Is Celery a server?

Celery is a task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept. Learn more in the web development chapter or view the table of contents for all topics.

How does Celery beat?

celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.


1 Answers

I would suggest to skip Celery and directly use Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image. Then on your input machine, when something is detected, you publish a message to a channel. On your output machine you subscribe to that channel and act on the events.

For example your input machine could use something like this:

import redis

def publish(message):
    r = redis.Redis(host="redis")
    r.publish("test-channel", message)

And then on the output side:

import time
import redis

def main():
    r = redis.Redis(host="redis", decode_responses=True)
    p = r.pubsub(ignore_subscribe_messages=True)
    p.subscribe("test-channel")

    while True:
        message = p.get_message()
        if message:
            print(message.get("data", ""))
            # Do more things...
        time.sleep(0.001)

In this way you can send plain text or JSON data between the input and output machine.

Find a sample implementation here: https://github.com/moritz-biersack/simple-async-pub-sub

like image 155
Moritz Avatar answered Oct 02 '22 23:10

Moritz