When running Celery on a Docker container which receives restAPI from other containers I get a RuntimeError: concurrent poll() invocation.
Did anyone face a similar error?
I attach the traceback.
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/opt/www/api/api/training_call.py", line 187, in start_process
result_state.get(on_message=self._on_raw_message, propagate=False)
File "/usr/local/lib/python3.5/dist-packages/celery/result.py", line 226, in get
on_message=on_message,
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 188, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 255, in _wait_for_pending
on_interval=on_interval):
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
yield self.wait_for(p, wait, timeout=1)
File "/usr/local/lib/python3.5/dist-packages/celery/backends/asynchronous.py", line 65, in wait_for
wait(timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/celery/backends/redis.py", line 127, in drain_events
message = self._pubsub.get_message(timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3135, in get_message
response = self.parse_response(block=False, timeout=timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/client.py", line 3034, in parse_response
if not block and not connection.can_read(timeout=timeout):
File "/usr/local/lib/python3.5/dist-packages/redis/connection.py", line 628, in can_read
return self._parser.can_read() or self._selector.can_read(timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 28, in can_read
return self.check_can_read(timeout)
File "/usr/local/lib/python3.5/dist-packages/redis/selector.py", line 156, in check_can_read
events = self.read_poller.poll(timeout)
RuntimeError: concurrent poll() invocation
The broker connection is not thread-safe, so you need to handle thread-safety in your application code. @Laizer mentioned the ticket where this error was introduced in the python core library
One way to do it is to wrap all the calls that block until task completion in shared Lock:
import celery
import threading
@celery.shared_task
def debug_task(self):
print('Hello, world')
def boom(nb_tasks):
""" not thread safe - raises RuntimeError during concurrent executions """
tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
pool = tasks.apply_async()
pool.join() # raised from here
CELERY_POLL_LOCK = threading.Lock()
def safe(nb_tasks):
tasks = celery.group([debug_task.s() for _ in range(nb_tasks)])
pool = tasks.apply_async()
with CELERY_POLL_LOCK: # prevents concurrent calls to poll()
pool.join()
def main(nb_threads, nb_tasks_per_thread):
for func in (safe, boom):
threads = [threading.Thread(target=func, args=(nb_tasks_per_thread, )) for _ in range(nb_threads)]
for a_thread in threads:
a_thread.start()
for a_thread in threads:
a_thread.join()
main(10, 100)
This is a naive approach, that's suitable for me because I don't expect much concurrency and all the tasks are relatively fast (~10s). If you have a different "profile", you may need something more convoluted (e.g. a single polling task that periodically polls for all pending groups / tasks).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With