Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing causes Python to crash and gives an error may have been in progress in another thread when fork() was called

This error occurs because of added security to restrict multithreading in macOS High Sierra and later versions of macOS. I know this answer is a bit late, but I solved the problem using the following method:

Set an environment variable .bash_profile (or .zshrc for recent macOS) to allow multithreading applications or scripts under the new macOS High Sierra security rules.

Open a terminal:

$ nano .bash_profile

Add the following line to the end of the file:

OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

Save, exit, close terminal and re-open the terminal. Check to see that the environment variable is now set:

$ env

You will see output similar to:

TERM_PROGRAM=Apple_Terminal
SHELL=/bin/bash
TERM=xterm-256color
TMPDIR=/var/folders/pn/vasdlj3ojO#OOas4dasdffJq/T/
Apple_PubSub_Socket_Render=/private/tmp/com.apple.launchd.E7qLFJDSo/Render
TERM_PROGRAM_VERSION=404
TERM_SESSION_ID=NONE
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

You should now be able to run your Python script with multithreading.


Running MAC and z-shell and in my .zshrc-file I had to add:

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

and then in the command line:

source ~/.zshrc

Then it worked


the other answers are telling you to set OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES, but don't do this! you're just putting sticky tape on the warning light. You may need this on a case by case basis for some legacy software but certainly do not set this in your .bash_profile!

this is fixed in https://bugs.python.org/issue33725 (python3.8+) but it's best practice to use

with multiprocessing.get_context("spawn").Pool() as pool:
    pool.map(annotate,img_urls)

The solution that works for me without OBJC_DISABLE_INITIALIZE_FORK_SAFETY flag in the environment involves initializing the multiprocessing.Pool class right after the main() program starts.

This is most likely not the fastest solution possible and I am not sure if it works in all situations, however, pre-heating the worker processes early enough before my programs starts does not result in any ... may have been in progress in another thread when fork() was called errors and I do get a significant performance boost compared to what I get with non-parallelized code.

I have created a convenience class Parallelizer which I am starting very early and then using throughout the lifecycle of my program. The full version can be found here.

# entry point to my program
def main():
    parallelizer = Parallelizer()
    ...

Then whenever you want to have parallelization:

# this function is parallelized. it is run by each child process.
def processing_function(input):
    ...
    return output

...
inputs = [...]
results = parallelizer.map(
    inputs,
    processing_function
)

And the parallelizer class:

class Parallelizer:
    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool(multiprocessing.cpu_count(),
                                         Parallelizer._run,
                                         (self.input_queue, self.output_queue,))

    def map(self, contents, processing_func):
        size = 0
        for content in contents:
            self.input_queue.put((content, processing_func))
            size += 1
        results = []
        while size > 0:
            result = self.output_queue.get(block=True)
            results.append(result)
            size -= 1
        return results

    @staticmethod
    def _run(input_queue, output_queue):
        while True:
            content, processing_func = input_queue.get(block=True)
            result = processing_func(content)
            output_queue.put(result)

One caveat: the parallelized code might be difficult to debug so I have also prepared a non-parallelizing version of my class which I enable when something goes wrong in the child processes:

class NullParallelizer:
    @staticmethod
    def map(contents, processing_func):
        results = []
        for content in contents:
            results.append(processing_func(content))
        return results