Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery chain not working with batches

At first glance I liked very much the "Batches" feature in Celery because I need to group an amount of IDs before calling an API (otherwise I may be kicked out).

Unfortunately, when testing a little bit, batch tasks don't seem to play well with the rest of the Canvas primitives, in this case, chains. For example:

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
        a.backend.mark_as_done(request.id, 42, request=request)
        print "filter_by_price " + str([r.args[0] for r in requests])

@a.task
def completed():
    print("complete")

So, with this simple workflow:

chain(get_price.s("ID_1"), completed.si()).delay()

I see this output:

[2015-07-11 16:16:20,348: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-07-11 16:16:20,376: INFO/MainProcess] mingle: searching for neighbors
[2015-07-11 16:16:21,406: INFO/MainProcess] mingle: all alone
[2015-07-11 16:16:21,449: WARNING/MainProcess] celery@ultra ready.
[2015-07-11 16:16:34,093: WARNING/Worker-4] filter_by_price ['ID_1']

After 5 seconds, filter_by_price() gets triggered just like expected. The problem is that completed() never gets invoked.

Any ideas of what could be going on here? If not using batches, what could be a decent approach to solve this problem?

PS: I have set CELERYD_PREFETCH_MULTIPLIER=0 like the docs say.

like image 944
ninja.user Avatar asked Jul 11 '15 19:07

ninja.user


People also ask

How do you call celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

Is celery multi threaded?

Celery supports two thread-based execution pools: eventlet and gevent. Here, the execution pool runs in the same process as the Celery worker itself. To be precise, both eventlet and gevent use greenlets and not threads.

What is broker and backend in celery?

Celery has the ability to communicate and store with many different backends (Result Stores) and brokers (Message Transports).

How to chain tasks in celery?

Another way of chaining tasks is using this syntax with pipe symbol ( " | " ). The above code is just the same as this code below, you can choose which one you prefer. Celery documentation says: Groups are used to execute tasks in parallel.

Should I use @atomic in my celery tasks?

This includes Celery tasks. If you don’t wrap your tasks with transaction.atomic (), or use it inside your task body, you may have data integrity problems. It’s worth auditing your tasks to find where you should use transaction.atomic (). You could even add a project-specific wrapper for Celery’s @shared_task that adds @atomic to your tasks. 4.

What are some common issues with celery task queues in Django?

Here are some issues I’ve seen crop up several times in Django projects using Celery. They probably apply with other task queues, I simply haven’t used them so much. 1. Enqueueing Data Rather Than References If you duplicate data from your database in your task arguments, it can go stale in the queue before the task executes.

Why can’t I enqueue data on celery?

This is not so easy to do accidentally on Celery since version 4, which changed the default serializer from Pickle to JSON. (If you’re not sure what serializer you’re using, check your settings.) But, it is still possible to enqueue data rather than references.


1 Answers

Looks like the behaviour of batch tasks is significantly different from normal tasks. Batch tasks are not even emitting signals like task_success.

Since you need to call completed task after get_price, You can call it directly from get_price itself.

@a.task(base=Batches, flush_every=10, flush_interval=5)
def get_price(requests):
    for request in requests:
         # do something
    completed.delay()
like image 182
Pandikunta Anand Reddy Avatar answered Oct 04 '22 15:10

Pandikunta Anand Reddy