Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

`ProcessPoolExecutor` works on Ubuntu, but fails with `BrokenProcessPool` when running Jupyter 5.0.0 notebook with Python 3.5.3 on Windows 10

I'm running Jupyter 5.0.0 notebook with Python 3.5.3 on Windows 10. The following example code fails to run:

from concurrent.futures import as_completed, ProcessPoolExecutor
import time
import numpy as np

def do_work(idx1, idx2):
    time.sleep(0.2)
    return np.mean([idx1, idx2])

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = set()
    for idx in range(32):
        future = winprocess.submit(
            executor, do_work, idx, idx * 2
        )
        futures.add(future)

    for future in as_completed(futures):
        print(future.result())

... and throws BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

The code works perfectly fine on Ubuntu 14.04.

I've understand that Windows doesn't have os.fork, thus multiprocessing is handled differently, and doesn't always play nice with interactive mode and Jupyter.

What are some workarounds to make ProcessPoolExecutor work in this case?

There are some similar questions, but they relate to multiprocessing.Pool:

  • multiprocessing.Pool in jupyter notebook works on linux but not windows
like image 545
André C. Andersen Avatar asked May 07 '17 21:05

André C. Andersen


1 Answers

Closer inspection shows that a Jupyter notebook can run external python modules which is parallelized using ProcessPoolExecutor. So, a solution is to do the parallelizable part of your code in a module and call it from the Jupyter notebook.

That said, this can be generalized as a utility. The following can be stored as a module, say, winprocess.py and imported by jupyter.

import inspect
import types


def execute_source(callback_imports, callback_name, callback_source, args):
    for callback_import in callback_imports:
        exec(callback_import, globals())
    exec('import time' + "\n" + callback_source)
    callback = locals()[callback_name]
    return callback(*args)


def submit(executor, callback, *args):
    callback_source = inspect.getsource(callback)
    callback_imports = list(imports(callback.__globals__))
    callback_name = callback.__name__
    future = executor.submit(
        execute_source,
        callback_imports, callback_name, callback_source, args
    )
    return future


def imports(callback_globals):
    for name, val in list(callback_globals.items()):
        if isinstance(val, types.ModuleType) and val.__name__ != 'builtins' and val.__name__ != __name__:
            import_line = 'import ' + val.__name__
            if val.__name__ != name:
                import_line += ' as ' + name
            yield import_line

Here is how you would use this:

from concurrent.futures import as_completed, ProcessPoolExecutor
import time
import numpy as np
import winprocess

def do_work(idx1, idx2):
    time.sleep(0.2)
    return np.mean([idx1, idx2])

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = set()
    for idx in range(32):
        future = winprocess.submit(
            executor, do_work, idx, idx * 2
        )
        futures.add(future)

    for future in as_completed(futures):
        print(future.result())

Notice that executor has been changed with winprocess and the original executor is passed to the submit function as a parameter.

What happens here is that the notebook function code and imports are serialized and passed to the module for execution. The code is not executed until it is safely in a new process, thus does not trip up with trying to make a new process based on the jupyter notebook itself.

Imports are handled in such a way as to maintain aliases. The import magic can be removed if you make sure to import everything needed for the function being executed inside the function itself.

Also, this solution only works if you pass all necessary variables as arguments to the function. The function should be static so to speak, but I think that's a requirement of ProcessPoolExecutor as well. Finally, make sure you don't execute other functions defined elsewhere in the notebook. Only external modules will be imported, thus other notebook functions won't be included.

like image 170
André C. Andersen Avatar answered Sep 24 '22 19:09

André C. Andersen