I'm trying to learn the joblib
module as an alternative to the builtin multiprocessing
module in python. I'm used to using multiprocessing.imap
to run a function over an iterable and returning the results as they come in. In this minimal working example, I can't figure out how to do it with joblib:
import joblib, time
def hello(n):
time.sleep(1)
print "Inside function", n
return n
with joblib.Parallel(n_jobs=1) as MP:
func = joblib.delayed(hello)
for x in MP(func(x) for x in range(3)):
print "Outside function", x
Which prints:
Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2
I'd like to see the output:
Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2
Or something similar, indicating that the iterable MP(...)
is not waiting for all the results to complete. For longer demo change n_jobs=-1
and range(100)
.
stovfl's answer is elegant, but it only works for the first batches dispatched. In the example, it works because the workers never starve (n_tasks < 2*n_jobs
). For this approach to work, the callback originally passed to apply_async
must also be called. This is an instance of BatchCompletionCallBack
, which schedules the next batch of tasks to be processed.
One possible solution is to wrap up arbitrary callbacks in a callable object, like this (tested in joblib==0.11, py36):
from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class ImmediateResultBackend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % result)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)
register_parallel_backend('custom', ImmediateResultBackend)
def hello(n):
time.sleep(1)
print("Inside function", n)
return n
with parallel_backend('custom'):
res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))
Output
Inside function 0
Inside function 1
ImmediateResult function [0]
ImmediateResult function [1]
Inside function 3
Inside function 2
ImmediateResult function [3]
ImmediateResult function [2]
Inside function 4
ImmediateResult function [4]
Inside function 5
ImmediateResult function [5]
To get Immediate results from joblib, for instance:
from joblib._parallel_backends import MultiprocessingBackend
class ImmediateResult_Backend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % (result))
# Overload apply_async and set callback=self.callback
def apply_async(self, func, callback=None):
applyResult = super().apply_async(func, self.callback)
return applyResult
joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)
with joblib.Parallel(n_jobs=2) as parallel:
func = parallel(delayed(hello)(y) for y in range(3))
for f in func:
print("Outside function %s" % (f))
Output:
Note: I use time.sleep(n * random.randrange(1,5))
in def hello(...)
, therefore processes
become different ready.
Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2
Tested with Python:3.4.2 - joblib:0.11
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With