Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery not accepting pickle even after allowing it

I'm trying to write a celery application that passes numpy arrays (or any arbitrary objects) to the workers. As far as I can tell, this requires serialization to occur via pickle (NB: I'm aware of the security implications but this isn't a concern in this case).

However, even after trying every possible way I could find to allow pickle as a serializer, I keep getting the following kombu exception:

kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted
content of type pickle (application/x-python-serialize)

My current files are currently:

# tasks.py
from celery import Celery
app = Celery(
    'tasks',
    broker='redis://localhost',
    accept_content=['pickle'],
    task_serializer='pickle'
)

@app.task
def adding(x, y):
    return x + y

if __name__ == '__main__':
    import numpy as np
    adding.apply_async((np.array([1]), np.array([1])), serializer='pickle')

In addition I have a config file:

# celeryconfig.py
print('configuring...')

accept_content = ['pickle', 'application/x-python-serialize']
task_serializer = 'pickle'
result_serializer = 'pickle'
from kombu import serialization
serialization.register_pickle()
serialization.enable_insecure_serializers()

However, if I run the worker (celery -A tasks worker --loglevel=info) and then execute the code that makes an async call (python tasks.py), I get the following traceback. Am I missing something?

[2018-06-16 11:46:23,617: CRITICAL/MainProcess] Unrecoverable error: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',)
Traceback (most recent call last):
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/consufrom celery import Celery
mer/consumer.py", line 598, in start
    c.loop(*c.loop_args())
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/transport/redis.py", line 1040, in on_readable
    self.cycle.on_readable(fileno)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/transport/redis.py", line 337, in on_readable
    chan.handlers[type]()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/transport/redis.py", line 724, in _brpop_read
    self.connection._deliver(loads(bytes_to_str(item)), dest)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 983, in _deliver
    callback(message)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
    return callback(message)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 572, in on_task_received
    callbacks,
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/celery/worker/strategy.py", line 136, in task_message_handler
    if body is None and 'args' not in message.payload:
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/message.py", line 207, in payload
    return self._decoded_cache if self._decoded_cache else self.decode()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/message.py", line 192, in decode
    self._decoded_cache = self._decode()
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/message.py", line 197, in _decode
    self.content_encoding, accept=self.accept)
  File "/opt/anaconda/envs/Python3/lib/python3.6/site-packages/kombu/serialization.py", line 253, in loads
    raise self._for_untrusted_content(content_type, 'untrusted')
kombu.exceptions.ContentDisallowed: Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)
like image 321
janfreyberg Avatar asked Jun 16 '18 11:06

janfreyberg


People also ask

Why use Python celery?

It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue. That way, your web app can continue to respond quickly to users while Celery completes expensive operations asynchronously in the background.


1 Answers

For anyone coming to this question:

The answer was to use the app.config_from_object method:

import celeryconfig
app.config_from_object(celeryconfig)
like image 146
janfreyberg Avatar answered Sep 28 '22 10:09

janfreyberg