Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python namedtuple as argument to apply_async(..) callback

I'm writing a short program where I want to call a function asynchronously so that it doesn't block the caller. To do this, I'm using Pool from python's multiprocessing module.

In the function being called asynchronously I want to return a namedtuple to fit with the logic of the rest of my program, but I'm finding that a namedtuple does not seem to be a supported type to pass from the spawned process to the callback (probably because it cannot be pickled). Here is a minimum repro of the problem.

from multiprocessing import Pool
from collections import namedtuple

logEntry = namedtuple("LogEntry", ['logLev', 'msg'])

def doSomething(x):
    # Do actual work here
    logCode = 1
    statusStr = "Message Here"
    return logEntry(logLev=logCode, msg=statusStr)

def callbackFunc(result):
    print(result.logLev)
    print(result.msg)

def userAsyncCall():
    pool = Pool()
    pool.apply_async(doSomething, [1,2], callback=callbackFunc)

if __name__ == "__main__":
    userAsyncCall() # Nothing is printed

    # If this is uncommented, the logLev and status are printed as expected:
    # y = logEntry(logLev=2, msg="Hello World")
    # callbackFunc(y)

Does anyone know if there is a way to pass a namedtuple return value from the async process to the callback? Is there a better/more pythonic approach for what I'm doing?

like image 970
nbryans Avatar asked Jan 24 '17 14:01

nbryans


2 Answers

The problem is that the case is different for the return value of namedtuple() and its typename parameter. That is, there's a mismatch between the named tuple's class definition and the variable name you've given it. You need the two to match:

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])

And update the return statement in doSomething() correspondingly.

Full code:

from multiprocessing import Pool
from collections import namedtuple

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])

def doSomething(x):
    # Do actual work here
    logCode = 1
    statusStr = "Message Here"
    return LogEntry(logLev=logCode, msg=statusStr)

def callbackFunc(result):
    print(result.logLev)
    print(result.msg)

def userAsyncCall():
    pool = Pool()
    return pool.apply_async(doSomething, [1], callback=callbackFunc)

if __name__ == "__main__":
    c = userAsyncCall()

    # To see whether there was an exception, you can attempt to get() the AsyncResult object.
    # print c.get()

(To see the class definition, add verbose=True to namedtuple().)

like image 197
Dag Høidahl Avatar answered Oct 21 '22 13:10

Dag Høidahl


The reason nothing is printed is that apply_async failed silently. By the way, I think this is a bad behavior which just make people confused. You can pass error_callback to handle error.

def errorCallback(exception):
    print(exception)

def userAsyncCall():
    pool = Pool()
    pool.apply_async(doSomething, [1], callback=callbackFunc,  error_callback=errorCallback)
    # You passed wrong arguments. doSomething() takes 1 positional argument.
    # I replace [1,2] with [1].

if __name__ == "__main__":
    userAsyncCall()
    import time
    time.sleep(3) # You need this, otherwise you will never see the output.

When you came here, the output is

Error sending result: 'LogEntry(logLev=1, msg='Message Here')'. Reason: 'PicklingError("Can't pickle <class '__mp_main__.LogEntry'>: attribute lookup LogEntry on __mp_main__ failed",)'

PicklingError! You're right, namedtuple cannot be passed from the spawned process to the callback.

Maybe it's not a more accpetable way, but you can send dict as result instead of namedtuple.

As Dag Høidahl corrected, namedtuple can be passed. The following line works.

LogEntry = namedtuple("LogEntry", ['logLev', 'msg'])
like image 4
gzc Avatar answered Oct 21 '22 12:10

gzc