Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RuntimeError: concurrent poll() invocation using celery

When running Celery on a Docker container which receives restAPI from other containers I get a RuntimeError: concurrent poll() invocation.

Did anyone face a similar error?

I attach the traceback.

Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/www/api/api/training_call.py", line 187, in start_process
    result_state.get(on_message=self._on_raw_message, propagate=False)
  File "/usr/local/lib/python3.5/dist-packages/celery/result.py", line 226, in get
    on_message=on_message,
  File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 188, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 255, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
    yield self.wait_for(p, wait, timeout=1)
  File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 65, in wait_for
    wait(timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/celery/backends/redis.py", line 127, in drain_events
    message = self._pubsub.get_message(timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3135, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3034, in parse_response
    if not block and not connection.can_read(timeout=timeout):
  File "/usr/local/lib/python3.5/dist-packages/redis/connection.py", line 628, in can_read
    return self._parser.can_read() or self._selector.can_read(timeout)
  File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 28, in can_read
    return self.check_can_read(timeout)
  File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 156, in check_can_read
    events = self.read_poller.poll(timeout)
RuntimeError: concurrent poll() invocation
like image 220
Gianfranco Avatar asked Apr 08 '19 17:04

Gianfranco


1 Answers

The broker connection is not thread-safe, so you need to handle thread-safety in your application code. @Laizer mentioned the ticket where this error was introduced in the python core library

One way to do it is to wrap all the calls that block until task completion in shared Lock:

import celery
import threading

@celery.shared_task
def debug_task(self):
    print('Hello, world')

def boom(nb_tasks):
    """ not thread safe - raises RuntimeError during concurrent executions """
    tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
    pool = tasks.apply_async()
    pool.join() # raised from here

CELERY_POLL_LOCK = threading.Lock()

def safe(nb_tasks):
    tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
    pool = tasks.apply_async()
    with CELERY_POLL_LOCK:  # prevents concurrent calls to poll()
        pool.join()

def main(nb_threads, nb_tasks_per_thread):
    for func in (safe, boom):
        threads = [threading.Thread(target=func, args=(nb_tasks_per_thread, )) for _ in range(nb_threads)]

        for a_thread in threads:
            a_thread.start()

        for a_thread in threads:
            a_thread.join()

main(10, 100)

This is a naive approach, that's suitable for me because I don't expect much concurrency and all the tasks are relatively fast (~10s). If you have a different "profile", you may need something more convoluted (e.g. a single polling task that periodically polls for all pending groups / tasks).

like image 147
Nielk Avatar answered Nov 20 '22 12:11

Nielk