At first glance I liked very much the "Batches" feature in Celery because I need to group an amount of IDs before calling an API (otherwise I may be kicked out).
Unfortunately, when testing a little bit, batch tasks don't seem to play well with the rest of the Canvas primitives, in this case, chains. For example:
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
a.backend.mark_as_done(request.id, 42, request=request)
print "filter_by_price " + str([r.args[0] for r in requests])
@a.task
def completed():
print("complete")
So, with this simple workflow:
chain(get_price.s("ID_1"), completed.si()).delay()
I see this output:
[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']
After 5 seconds, filter_by_price() gets triggered just like expected. The problem is that completed() never gets invoked.
Any ideas of what could be going on here? If not using batches, what could be a decent approach to solve this problem?
PS: I have set CELERYD_PREFETCH_MULTIPLIER=0
like the docs say.
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
Celery supports two thread-based execution pools: eventlet and gevent. Here, the execution pool runs in the same process as the Celery worker itself. To be precise, both eventlet and gevent use greenlets and not threads.
Celery has the ability to communicate and store with many different backends (Result Stores) and brokers (Message Transports).
Another way of chaining tasks is using this syntax with pipe symbol ( " | " ). The above code is just the same as this code below, you can choose which one you prefer. Celery documentation says: Groups are used to execute tasks in parallel.
This includes Celery tasks. If you don’t wrap your tasks with transaction.atomic (), or use it inside your task body, you may have data integrity problems. It’s worth auditing your tasks to find where you should use transaction.atomic (). You could even add a project-specific wrapper for Celery’s @shared_task that adds @atomic to your tasks. 4.
Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply with other task queues, I simply haven’t used them so much. 1. Enqueueing Data Rather Than References If you duplicate data from your database in your task arguments, it can go stale in the queue before the task executes.
This is not so easy to do accidentally on Celery since version 4, which changed the default serializer from Pickle to JSON. (If you’re not sure what serializer you’re using, check your settings.) But, it is still possible to enqueue data rather than references.
Looks like the behaviour of batch tasks is significantly different from normal tasks. Batch tasks are not even emitting signals like task_success.
Since you need to call completed
task after get_price
, You can call it directly from get_price
itself.
@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
for request in requests:
# do something
completed.delay()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With