Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Gevent Pool - ConcurrentObjectUseError

I have a celery worker that is using the gevent pool which does HTTP requests and adds another celery task with page source.

I'm using Django, RabbitMQ as a broker, Redis as a celery result backend, Celery 4.1.0.

The task has ignore_result=True but I'm getting this error pretty often ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter...>

I see it is related to the Redis connection.

I can't figure out how to solve this. This is more or less the logic of the task. I also tried using a semaphore when calling process_task.apply_async but it didn't work.

from gevent.lock import BoundedSemaphore

sem = BoundedSemaphore(1)


@app.task(ignore_result=True, queue='request_queue')
def request_task(url, *args, **kwargs):
    # make the request
    req = requests.get(url)

    request = {
        'status_code': req.status_code,
        'content': req.text,
        'headers': dict(req.headers),
        'encoding': req.encoding
    }
    with sem:
        process_task.apply_async(kwargs={'url': url, 'request': request})
    print(f'Done - {url}')

This is the stack trace:

cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
  File "redis/connection.py", line 543, in send_packed_command
    self._sock.sendall(item)
  File "gevent/_socket3.py", line 424, in sendall
    data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
  File "gevent/_socket3.py", line 394, in send
    self._wait(self._write_event)
  File "gevent/_socket3.py", line 156, in _wait
    self.hub.wait(watcher)
  File "gevent/hub.py", line 651, in wait
    result = waiter.get()
  File "gevent/hub.py", line 898, in get
    return self.hub.switch()
  File "gevent/hub.py", line 630, in switch
    return RawGreenlet.switch(self)
ConnectionError: Error 9 while writing to socket. File descriptor was closed in another greenlet.
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 556, in send_packed_command
    (errno, errmsg))
OSError: [Errno 9] Bad file descriptor
  File "redis/connection.py", line 126, in _read_from_socket
    data = self._sock.recv(socket_read_size)
  File "gevent/_socket3.py", line 332, in recv
    return _socket.socket.recv(self._sock, *args)
ConnectionError: Error while reading from socket: (9, 'Bad file descriptor')
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 446, in connect
    self.on_connect()
  File "redis/connection.py", line 520, in on_connect
    if nativestr(self.read_response()) != 'OK':
  File "redis/connection.py", line 577, in read_response
    response = self._parser.read_response()
  File "redis/connection.py", line 238, in read_response
    response = self._buffer.readline()
  File "redis/connection.py", line 168, in readline
    self._read_from_socket()
  File "redis/connection.py", line 143, in _read_from_socket
    (e.args,))
BlockingIOError: [Errno 11] Resource temporarily unavailable
  File "gevent/_socket3.py", line 390, in send
    return _socket.socket.send(self._sock, data, flags)
OSError: [Errno 9] Bad file descriptor
  File "redis/connection.py", line 543, in send_packed_command
    self._sock.sendall(item)
  File "gevent/_socket3.py", line 424, in sendall
    data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
  File "gevent/_socket3.py", line 396, in send
    return _socket.socket.send(self._sock, data, flags)
ConnectionError: Error 9 while writing to socket. Bad file descriptor.
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 556, in send_packed_command
    (errno, errmsg))
BlockingIOError: [Errno 11] Resource temporarily unavailable
  File "gevent/_socket3.py", line 390, in send
    return _socket.socket.send(self._sock, data, flags)
OSError: [Errno 9] Bad file descriptor
  File "redis/connection.py", line 543, in send_packed_command
    self._sock.sendall(item)
  File "gevent/_socket3.py", line 424, in sendall
    data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
  File "gevent/_socket3.py", line 396, in send
    return _socket.socket.send(self._sock, data, flags)
ConnectionError: Error 9 while writing to socket. Bad file descriptor.
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 556, in send_packed_command
    (errno, errmsg))
BlockingIOError: [Errno 11] Resource temporarily unavailable
  File "gevent/_socket3.py", line 390, in send
    return _socket.socket.send(self._sock, data, flags)
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f271b53dea0>>
  File "celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "drones/tasks.py", line 330, in blue_drone_request_task
    blue_drone_process_task.apply_async(kwargs={'targetpage': targetpage, 'request': request})
  File "celery/app/task.py", line 536, in apply_async
    **options
  File "celery/app/base.py", line 736, in send_task
    self.backend.on_task_call(P, task_id)
  File "celery/backends/redis.py", line 189, in on_task_call
    self.result_consumer.consume_from(task_id)
  File "celery/backends/redis.py", line 76, in consume_from
    self._consume_from(task_id)
  File "celery/backends/redis.py", line 82, in _consume_from
    self._pubsub.subscribe(key)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2172, in _execute
    connection.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 538, in send_packed_command
    self.connect()
  File "redis/connection.py", line 455, in connect
    callback(self)
  File "redis/client.py", line 2120, in on_connect
    self.subscribe(**channels)
  File "redis/client.py", line 2229, in subscribe
    ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
  File "redis/client.py", line 2161, in execute_command
    self._execute(connection, connection.send_command, *args)
  File "redis/client.py", line 2165, in _execute
    return command(*args)
  File "redis/connection.py", line 563, in send_command
    self.send_packed_command(self.pack_command(*args))
  File "redis/connection.py", line 543, in send_packed_command
    self._sock.sendall(item)
  File "gevent/_socket3.py", line 424, in sendall
    data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
  File "gevent/_socket3.py", line 394, in send
    self._wait(self._write_event)
  File "gevent/_socket3.py", line 150, in _wait
    raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
like image 821
Bogdan Boamfa Avatar asked Oct 17 '22 21:10

Bogdan Boamfa


1 Answers

I am not sure if this is the correct answer, but by setting the CELERY_RESULT_BACKEND = None, I stopped seeing this error.

I hope this helps.

FYI This error occurred only when using the gevent pool. Celery versions I have tried with: 3.1, 4.2.1.

like image 89
Bogdan Boamfa Avatar answered Oct 21 '22 02:10

Bogdan Boamfa