Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calling celery task hangs for delay and apply_async

I have created a celery app with following directory structure (as given in celery site):

proj
|-- celery.py
|-- celery.pyc
|-- __init__.py
|-- __init__.pyc
|-- tasks.py
`-- tasks.pyc

Following are contents of celery.py

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
             #backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

Following is the content of tasks.py

from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

Now I am starting celery worker with following command:

celery -A proj worker -l debug

I think worker is running fine as it outputs following on:

[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@ansumanb-u12 v3.1.12 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x1f46690
- ** ---------- .> transport:   amqp://rabbitmquser:**@localhost:5672/localvhost
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection

After running the worker I am opening the terminal and from python interpreter and executing following:

>>> from proj.tasks import add
>>> add(2,2)
4
>>> add.delay(2,3)

Here the delay hangs (same story for apply_async). When I am stopping it by Ctrl+C I am getting following:

^CTraceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
    return self.apply_async(args, kwargs)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
    **dict(self._get_exec_options(), **options)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
    reply_to=reply_to or self.oid, **options
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
    **kwargs
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
    return fun(*args, **kwargs)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
    channel = self.channel
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
    channel = self._channel = channel()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
    value = self.__value__ = self.__contract__()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
    self.connection
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
    self._connection = self._establish_connection()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
    conn = self.transport.establish_connection()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
    conn = self.Connection(**opts)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
    (10, 10),  # start
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
    self.method_reader.read_method()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
    frame_type, channel, payload = read_frame()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
    frame_type, channel, size = unpack('>BHI', read(7, True))
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
    s = recv(n - len(rbuf))
KeyboardInterrupt

Any suggestion or comment will be much appreciated. I have gone through other links where they talk about /var directory size but I think I have enough space.

Result of df -h

Filesystem      Size  Used Avail Use% Mounted on
/dev/sda3       283G   99G  170G  37% /
udev            1.9G  4.0K  1.9G   1% /dev
tmpfs           388M  1.1M  387M   1% /run
none            5.0M     0  5.0M   0% /run/lock
none            1.9G   28M  1.9G   2% /run/shm

following is the result of rabbitmqctl status

Status of node 'rabbit@ansumanb-u12' ...
[{pid,12014},
 {running_applications,[{rabbit,"RabbitMQ","3.3.2"},
                        {os_mon,"CPO  CXC 138 46","2.2.7"},
                        {xmerl,"XML parser","1.2.10"},
                        {mnesia,"MNESIA  CXC 138 12","4.5"},
                        {sasl,"SASL  CXC 138 11","2.1.10"},
                        {stdlib,"ERTS  CXC 138 10","1.17.5"},
                        {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"},
 {memory,[{total,27919080},
          {connection_procs,2704},
          {queue_procs,5408},
          {plugins,0},
          {other_proc,9099992},
          {mnesia,63776},
          {mgmt_db,0},
          {msg_index,34080},
          {other_ets,784160},
          {binary,12144},
          {code,14685283},
          {atom,1367393},
          {other_system,1864140}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,1625165004},
 {disk_free_limit,50000000},
 {disk_free,181684699136},
 {file_descriptors,[{total_limit,2},
                    {total_used,0},
                    {sockets_limit,0},
                    {sockets_used,0}]},
 {processes,[{limit,1048576},{used,127}]},
 {run_queue,0},
 {uptime,20072}]
...done.

I've checked the rabbitmq logs and didn't get anything there. Celery version is 3.1.12.

I have created rabbitmq virtual host and user with following commands

$ sudo rabbitmqctl add_user rabbitmquser <mypassword>
$ sudo rabbitmqctl add_vhost localvhost
$ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"

Thanks

like image 589
Ansuman Bebarta Avatar asked Jun 12 '14 16:06

Ansuman Bebarta


1 Answers

I did a great mistake. My problem was I tried to change something about which I didn't know. I am adding it for others reference.

While installing rabbitmq I changed the default value of ulimit from 1024 to 100 at /etc/default/rabbitmq-server.

I changed the value back to 1024 and the issue is fixed now.

like image 166
Ansuman Bebarta Avatar answered Nov 10 '22 13:11

Ansuman Bebarta