Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Intermediate results from joblib

I'm trying to learn the joblib module as an alternative to the builtin multiprocessing module in python. I'm used to using multiprocessing.imap to run a function over an iterable and returning the results as they come in. In this minimal working example, I can't figure out how to do it with joblib:

import joblib, time

def hello(n):
    time.sleep(1)
    print "Inside function", n
    return n

with joblib.Parallel(n_jobs=1) as MP:

    func = joblib.delayed(hello)
    for x in MP(func(x) for x in range(3)):
        print "Outside function", x

Which prints:

Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2

I'd like to see the output:

Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2

Or something similar, indicating that the iterable MP(...) is not waiting for all the results to complete. For longer demo change n_jobs=-1 and range(100).

like image 861
Hooked Avatar asked Jul 20 '16 14:07

Hooked


2 Answers

stovfl's answer is elegant, but it only works for the first batches dispatched. In the example, it works because the workers never starve (n_tasks < 2*n_jobs). For this approach to work, the callback originally passed to apply_async must also be called. This is an instance of BatchCompletionCallBack, which schedules the next batch of tasks to be processed.

One possible solution is to wrap up arbitrary callbacks in a callable object, like this (tested in joblib==0.11, py36):

from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time

class MultiCallback:
    def __init__(self, *callbacks):
        self.callbacks = [cb for cb in callbacks if cb]

    def __call__(self, out):
        for cb in self.callbacks:
            cb(out)

class ImmediateResultBackend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % result)

    def apply_async(self, func, callback=None):
        cbs = MultiCallback(callback, self.callback)
        return super().apply_async(func, cbs)

register_parallel_backend('custom', ImmediateResultBackend)

def hello(n):
    time.sleep(1)
    print("Inside function", n)
    return n

with parallel_backend('custom'):
    res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))

Output

Inside function 0
Inside function 1
    ImmediateResult function [0]
    ImmediateResult function [1]
Inside function 3
Inside function 2
    ImmediateResult function [3]
    ImmediateResult function [2]
Inside function 4
    ImmediateResult function [4]
Inside function 5
    ImmediateResult function [5]
like image 86
Carlos Santos Avatar answered Oct 24 '22 18:10

Carlos Santos


To get Immediate results from joblib, for instance:

from joblib._parallel_backends import MultiprocessingBackend

class ImmediateResult_Backend(MultiprocessingBackend):
    def callback(self, result):
        print("\tImmediateResult function %s" % (result))

    # Overload apply_async and set callback=self.callback
    def apply_async(self, func, callback=None):
        applyResult = super().apply_async(func, self.callback)
        return applyResult

joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)

with joblib.Parallel(n_jobs=2) as parallel:
    func = parallel(delayed(hello)(y) for y in range(3))
    for f in func:
        print("Outside function %s" % (f))

Output:
Note: I use time.sleep(n * random.randrange(1,5)) in def hello(...), therefore processes become different ready.

Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2

Tested with Python:3.4.2 - joblib:0.11

like image 25
stovfl Avatar answered Oct 24 '22 18:10

stovfl