Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery worker only imports tasks when not detached

Tags:

I'm trying to get my django app to submit tasks to a celery worker and it's succeeding when the worker is run attached. As soon as I add the --detach the tasks are failing to be registered.

[2020-10-26 04:09:33,159: ERROR/MainProcess] Received unregistered task of type 'devapp.tasks.create_random_files'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[20, "blah", 5120], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (93b)
Traceback (most recent call last):
  File "/pysite/project/venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 562, in on_task_received
    strategy = strategies[type_]
KeyError: 'devapp.tasks.create_random_files'

In my tasks.py I have

import os
import string
import subprocess
from celery import shared_task, task

@shared_task(name='devapp.tasks.create_random_files')
def create_random_files(total,filename,size):
    for i in range(int(total)):
        filenum = str(i).zfill(5)
        rnd_filename = '/brdis/{}-{}'.format(filenum,filename)
        with open(rnd_filename, 'wb') as f:
            f.write(os.urandom(size))
            f.close
    return '{} random files created'.format(total)

and in my views.py I have

from django.shortcuts import render
from django.http import HttpResponse

from django.contrib import messages
from django.views.generic import TemplateView
from django.views.generic.list import ListView
from django.views.generic.edit import FormView
from django.shortcuts import redirect

from .forms import CreateRandomFilesForm, GetFileListForm, ClamScanFileForm
from .tasks import create_random_files, clamscanfile

class ClamScanFileView(FormView):
    template_name = 'devapp/clamscan_file.html'
    form_class = ClamScanFileForm

    def form_valid(self, form):
        filename = form.cleaned_data.get('filename')
        clamscanfile.delay(filename)
        messages.success(self.request, 'We are scanning your file!s Wait a moment and refresh this page.')
        return redirect('files_list')

I understand that it may be relative imports(something else i need to get my head round) but I don't understand why adding the --detach to the celery worker command produces this error:

celery -A project worker -E --loglevel=INFO --logfile=/var/log/celery/celeryd.log --pidfile=/var/run/celery/celeryd.pid

works fine but...

celery -A project worker -E --loglevel=INFO --logfile=/var/log/celery/celeryd.log --pidfile=/var/run/celery/celeryd.pid --detach

starts the worker but doesn't register the tasks?

Any help appreciated.

FWIW

celery report

software -> celery:4.4.7 (cliffs) kombu:4.6.11 py:3.6.8
            billiard:3.6.3.0 py-amqp:2.6.1
platform -> system:Linux arch:64bit, ELF
            kernel version:3.10.0-1127.19.1.el7.x86_64 imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

like image 795
AndyFry Avatar asked Oct 26 '20 04:10

AndyFry


People also ask

How does Celery execute tasks?

Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.

How does Celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Where are celery tasks stored?

Celery Ignore Results and celery backend For every task celery internally store the task state in celery_taskmeta table. So before starting it processes the table data and after processing it updates the data in the table that decreases the performance.

How do you call Celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.


1 Answers

This is a bug with 4.4.7. Downgrading to 4.4.6 or upgrading to 5 should fix it.

https://github.com/celery/celery/issues/6370

like image 191
Giannis Avatar answered Sep 29 '22 10:09

Giannis