Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SynchronousOnlyOperation from celery task using gevent execution pool

Given celery running with these options:

celery -A openwisp2 worker -l info --pool=gevent --concurrency=15 -Ofair

Given this celery task from openwisp-monitoring:

@shared_task
def perform_check(uuid):
    """
    Retrieves check according to the passed UUID
    and calls ``check.perform_check()``
    """
    try:
        check = get_check_model().objects.get(pk=uuid)
    except ObjectDoesNotExist:
        logger.warning(f'The check with uuid {uuid} has been deleted')
        return
    result = check.perform_check()
    if settings.DEBUG:  # pragma: nocover
        print(json.dumps(result, indent=4, sort_keys=True))

Most of the time the task works, but some times (usually with bursts), the following exception is generated:

SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

Full stack trace:

SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
  File "celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "openwisp_monitoring/check/tasks.py", line 44, in perform_check
    check = get_check_model().objects.get(pk=uuid)
  File "django/db/models/manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "django/db/models/query.py", line 425, in get
    num = len(clone)
  File "django/db/models/query.py", line 269, in __len__
    self._fetch_all()
  File "django/db/models/query.py", line 1308, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File "django/db/models/query.py", line 53, in __iter__
    results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
  File "django/db/models/sql/compiler.py", line 1154, in execute_sql
    cursor = self.connection.cursor()
  File "django/utils/asyncio.py", line 24, in inner
    raise SynchronousOnlyOperation(message)

I do not fully understand why this is happening.

Let's recap, please correct me if I'm wrong:

  1. with this configuration, celery can execute tasks in parallel, this parallelization is performed by gevent using an asyncio event loop
  2. gevent then calls each task, using the same thread
  3. the tasks are not designed to be asynchronous, they use plain sync code, these tasks perform DB queries and network requests
  4. django has an async_unsafe decorator with which database operations are decorated, this decorator checks if the event loop is running and in that case it raises a SynchronousOnlyOperation

But why isn't this exception raised in 100% of cases but only in a minority of cases?

Those tasks are indeed working, I can see it because they produce the collection of chart data which is displayed normally, or they produce changes of status in the device model (eg: ok to critical).

Is it a bug in OpenWISP Monitoring, a misconfiguration or a bug in Django?

It looks like the event loop is not being used in Django, yet Django is raising this exception even though it does not concern it. This could be a bug, but would like to hear the opinion of experts on the subject before filing a bug report.

I thought that a possible quick solution may be to set the env variable DJANGO_ALLOW_ASYNC_UNSAFE but only in the celery process.

Thanks in advance.

like image 924
nemesisdesign Avatar asked Dec 16 '20 16:12

nemesisdesign


Video Answer


1 Answers

It turned out my assumptions were wrong and I was using code which was not greenlet safe with gevent.

Since rewriting the code to be greenlet safe for now was not an option, I switched back to prefork.

like image 54
nemesisdesign Avatar answered Oct 18 '22 22:10

nemesisdesign