Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pass client-side function to celery task

Tags:

python

celery

I'd like to pass a function the client creates to a celery task and have it be executed. For example below, I'm trying to write a map function that takes in a function f and a list l and do map(f, l) in the celery task.

Presumably, the function is not getting serialized correctly (understandable, that's hard). But, is there any way to do this? What's the best practice? I suppose I could pass a string and then exec it, but I'd rather not just the way my app is intended to work.

edit: I found a way to serialize a function... I guess I could wrap that up to do what I need to do. Any better ideas?


from celery import Celery

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def cp_map(f, l):
    return map(f, l)

Then, I try to use this task with this:

In [20]: from tasks import cp_map

In [21]: def f(x): return x + 1

In [22]: cp_map.delay(f, [1,2,3])
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206>

In [23]: _.status
Out[23]: 'PENDING'

On the worker, I get this:

[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\\x80\\x02}q\\x01(U\\x07expiresq\\x02NU\\x03utcq\\x03\\x88U\\x04argsq\\x04c__main__\\nf\\nq\\x05]q\\x06(K\\x01K\\x02K\\x03e\\x86q\\x07U\\x05chordq\\x08NU\\tcallbacksq\\tNU\\x08errbacksq\\nNU\\x07tasksetq\\x0bNU\\x02idq\\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\\rU\\x07retriesq\\x0eK\\x00U\\x04taskq\\x0fU\\x0ctasks.cp_mapq\\x10U\\ttimelimitq\\x11NN\\x86U\\x03etaq\\x12NU\\x06kwargsq\\x13}q\\x14u.' (233b)"')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback
    decoded = None if on_m else message.decode()
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode
    self.content_encoding, accept=self.accept)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors
    yield
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads
    return load(BytesIO(s))
DecodeError: 'module' object has no attribute 'f'
like image 489
Donald Miner Avatar asked Sep 18 '25 07:09

Donald Miner


1 Answers

You can use marshal to serialize the function to a string then deserialize it in the task. I don't know if it's the best way, but but it will work. You may also want to look at dill.

Here is some example code copied from another stackoverflow answer:

import marshal
def foo(x): return x*x
code_string = marshal.dumps(foo.func_code)

Then in the task:

import marshal, types

code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "some_func_name")

func(10)  # gives 100
like image 123
joshua Avatar answered Sep 20 '25 22:09

joshua