Recently , I am doing an experiment on a GIT project to understanding the big data processing framework.
1、GIT project:https://github.com/esperdyne/celery-message-processing
we have the following components:
1、AMPQ broker(RabbitMQ): it works as a message buffer, which works as a mail-box to exchange messages for different user!
2、worker: it works as the service-server to provide service for various service client. 3、Queue("celery":it works as a multi-processing container which is used to handle the various worker instances at the same time.
the key configuration can be seen as bellow:
We use the object proj/celery.py to define the app, the definition can be seen as below:
app = Celery('proj',
broker='amqp://',
backend='redis://localhost',
include=['proj.tasks'])
enter code here
when we start the app:
1、 when we start the application, we have seen the message which is produced from the rabbitmq, yet the celery could not handle the message.
Parse.log looks like this:[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
we have the following question:
4.2.1 AMQP mechanism We can see that the AMQP works as the message buffer, then there will be a message sender and a message fetcher:
In the above diagram , who is the message sender and who is the message fetcher.
4.2.2 Message definition In our application , we can not find the code to define the Message to send ,or to receive form the AMQP.
4.2.3 Message monitor How can we monitor the Message send and receive in the AMQP. Hope a teacher will guide us to solve the problem , and give us some detailed
introduction on the celery broker mechenism!
note : the error log can be seen here
[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: [[u'maildir/allen- p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': 'gen17347@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}
[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete
[2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready.
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': 'gen19722@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}
enter code here
Celery contains vitamin C, beta carotene, and flavonoids, but there are at least 12 additional kinds of antioxidant nutrients found in a single stalk. It's also a wonderful source of phytonutrients, which have been shown to reduce instances of inflammation in the digestive tract, cells, blood vessels, and organs.
Daily consumption of celery juice helps balance the bowel flora and aids in smooth bowel movement. Celery is about 95 percent water and it contains generous amounts of soluble and insoluble dietary fiber that help prevent constipation and diarrhea. Celery contains flavonoids that inhibit the growth of certain bacteria.
Celery is high in electrolytes, vitamins, and fiber and can be an easy addition to lunch or a snack by serving raw. Buy a stalk for easy snacking, and simply store it in the fridge pre-chopped submerged in water to keep it crisp for longer!
Eating the whole food, though, is better. Celery stalk salt content is low, and you also get fiber, magnesium and potassium to help regulate your blood pressure, as well. “To get the benefit, you should eat roughly four stalks – one cup, chopped – of celery daily,” Dr. Laffin says.
It would be helpful to give the versions of celery and librabbitmq you are using. Since I had a very similar problem, I'll guess that you are using celery 4.0.2 and librabbitmq 1.6.1.
In such case, this is a known compatibility issue, you can refer to https://github.com/celery/celery/issues/3675 and https://github.com/celery/librabbitmq/issues/93.
The first link gives you recommendation to solve your problem namely:
uninstall librabbitmq
pip uninstall librabbitmq
(you may have to call this command many times)
change the occurrences of amqp
to pyamqp
in your borker urls. (Though not in your config file if your are using one. Doing that did not work for me).
To answer more precisely your other questions: you are right saying that there is a sender and a fetcher.
The sender role is assumed by the app created when you call Celery(...)
. One of its role is to act as a registry of tasks, and if you look at its implementation in app/base.py, you'll see that it implements a method send_task
which is directly called by the method apply_async
of the Task class. This method's role is to send a marshalled version of your task through the wire up to the broker so it can be fetched by a worker. The application protocol used to transmit the message is amqp, for which an implementation is librabbitmq.
On the other side of the wire, there is another instance, launched by the worker which does the fetching work. In celery's parlance, it is called a Consumer
. You can find its implementation in worker/consumer/consumer.py. You will see that it implements a create_task_handler
which in turns defines the on_task_received
functions that raises the error you are seeing. It is the function called when a new task is fetched from the worker and next in line to by processed.
The solution suggested therefore consists in changing the implementation of the amqp protocol so that a TypeError
is not raised in on_task_received
(which it seems to me would be caused by an encoding issue).
I hope it answers all your questions and gives you a clearer view of how celery works. I should end by saying that to my knowledge a "conventional" use of Celery would never require you to tamper with those kind of internals, and that you can achieve 99% of what you may want by implementing custom task classes and custom backends for example.
Just so that the answer is located here as well. In the thread Anis refers to 23doors mentions that Celery 4's new default protocol does not play nice with librabbitmq
:
Apparently librabbitmq issue is related to new default protocol in celery 4.x.
He also mentions that to resolve this issue you can make use of the older protocol Celery offers by setting (if you're using Django):
CELERY_TASK_PROTOCOL = 1
Otherwise you can set the following in your celeryconf.py
file
app.conf.task_protocol = 1
All credit to 23doors :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With