I am working with a dockerized Django system which processes buffers for users. Let's say I have 3 users and one of them decides to queue thousands of buffers. I do not want the other 2 to have to wait before all of those are completed.
Would it be possible to dynamically start a worker who only processes tasks which are in the queue assigned to that user? I know you can dynamically decide the queue in which a task should be placed upon calling the task.
I tried starting workers from within my Django project on startup. But I seem to fail to start more than just one. I attempted doing this in my celery config at first:
import os
from celery import Celery
from celery.bin import worker
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery('project1')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.ONCE = {
'settings': {
'url': 'redis://' + 'redis',
'blocking': True,
'default_timeout': 60 * 60,
'blocking_timeout': 86400
}
}
print("Starting worker...")
for e in Customer.objects.all():
worker = worker.worker(app=app)
worker.run(queues=["Queue" + e.id.__str__(),])
Is this even an legitimate way of doing this? If so how can I create workers from my Python source code?
If this is not the way to go, how can I dynamically create or delete workers for my users?
The code you have is a legitimate way to create a new worker. You would probably also want to investigate detaching the worker when you start it. That said, if you have lots of users this implementation is limited by two facts (1) all of the workers run on the same machine and (2) the caller does not control how / when the worker will stop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With