I want to set up a network pub/sub event system but also needs to be able to run tasks asynchronously. I have tried getting celery to do the heavy lifting but I feel like I am trying to shim a whole bunch of things just to get it working.
I have two machines (input and output) and they both have access to RabbitMQ. I would like to have a main program kick off a loop that waits for input (movement detected by a webcam). I have it set up that input_machine starts main.py which starts a celery task that is monitored by a worker on the input_machine subbed to "input" queue. This task just runs a while True loop until some input is detected, which it then calls another named ('project.entered_room' which does nothing) celery task to the "output" queue.
Meanwhile on output_machine, I have a celery instance watching the "output" queue with a task named ('project.entered_room' which responds to someone entering the room).
So when input is detected on input_machine, a task runs on output machine. I can get this to work but run into lots of import issues and other headaches. Is there an easier way to accomplish this? I am going about it all wrong? Am I using the wrong tools?
I have looked into a number of different frameworks including circuits and twisted. Twisted is very complex and I feel like I would be hitting a nail with a jackhammer.
From my understanding, Celery is a distributed task queue, which means the only thing that it should do is dispatching tasks/jobs to others servers and get the result back. RabbitMQ is a message queue, and nothing more. However, a worker could just listen to the MQ and execute the task when a message is received.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
Celery is a task queue implementation for Python web applications used to asynchronously execute work outside the HTTP request-response cycle. Celery is an implementation of the task queue concept. Learn more in the web development chapter or view the table of contents for all topics.
celery beat is a scheduler; It kicks off tasks at regular intervals, that are then executed by available worker nodes in the cluster. By default the entries are taken from the beat_schedule setting, but custom stores can also be used, like storing the entries in a SQL database.
I would suggest to skip Celery and directly use Redis with its pub/sub functionality. You can spin up Redis for example by running the Docker image. Then on your input machine, when something is detected, you publish a message to a channel. On your output machine you subscribe to that channel and act on the events.
For example your input machine could use something like this:
import redis
def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)
And then on the output side:
import time
import redis
def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")
while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)
In this way you can send plain text or JSON data between the input and output machine.
Find a sample implementation here: https://github.com/moritz-biersack/simple-async-pub-sub
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With