Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fix BrokenProcessPool: error for concurrent.futures ProcessPoolExecutor

Using concurrent.futures.ProcessPoolExecutor I am trying to run the first piece of code to execute the function "Calculate_Forex_Data_Derivatives(data,gride_spacing)" in parallel. When calling the results, executor_list[i].result(), I get "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending." I have tried running the code sending multiple calls of the function to the processing pool as well as running the code only sending one call to the processing pool, both resulting in the error.

I have also tested the structure of the code with a simpler piece of code (2nd code provided) with the same types of input for the call function and it works fine. The only thing different that I can see between the two pieces of code is the first code calls the function "FinDiff(axis,grid_spacing,derivative_order)" from the 'findiff' module. This function along with the "Calculate_Forex_Data_Derivatives(data,gride_spacing)" work perfectly on there own when running normally in series.

I am using Anaconda environment, Spyder editor, and Windows.

Any help would be appreciated.

#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

import pandas as pd
import numpy as np
from findiff import FinDiff
import multiprocessing
import concurrent.futures

def Calculate_Forex_Data_Derivatives(forex_data,dt):  #function to run in parallel
    try:
        dClose_dt = FinDiff(0,dt,1)(forex_data)[-1]
    except IndexError:
        dClose_dt = np.nan

    try:   
        d2Close_dt2 = FinDiff(0,dt,2)(forex_data)[-1]
    except IndexError:
        d2Close_dt2 = np.nan

    try:
        d3Close_dt3 = FinDiff(0,dt,3)(forex_data)[-1]
    except IndexError:
        d3Close_dt3 = np.nan

    return dClose_dt, d2Close_dt2, d3Close_dt3

#input for function
#forex_data is pandas dataframe, forex_data['Close'].values is numpy array
#dt is numpy array
#input_1 and input_2 are each a list of numpy arrays

input_1 = []
input_2 = []
for forex_data_index,data_point in enumerate(forex_data['Close'].values[:1]):
    input_1.append(forex_data['Close'].values[:forex_data_index+1])
    input_2.append(dt[:forex_data_index+1])


def multi_processing():
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index in range(len(input_1)):
            executors_list.append(executor.submit(Calculate_Forex_Data_Derivatives,input_1[index],input_2[index]))

    return executors_list

if __name__ == '__main__':
    print('calculating derivatives')
    executors_list = multi_processing()

for output in executors_list
    print(output.result()) #returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."


##############################################################


#simple example that runs fine

def function(x,y):  #function to run in parallel
    try:
        asdf
    except NameError:
        a = (x*y)[0]
        b = (x+y)[0]

    return  a,b

x=[np.array([0,1,2]),np.array([3,4,5])]    #function inputs, list of numpy arrays
y=[np.array([6,7,8]),np.array([9,10,11])]

def multi_processing():    
    executors_list = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for index,_ in enumerate(x):
            executors_list.append(executor.submit(function,x[index],y[index]))

    return executors_list

if __name__ == '__main__':
    executors_list = multi_processing()

for output in executors_list:   #prints as expected
    print(output.result())      #(0, 6)
                                #(27, 12)
like image 337
ZachV Avatar asked Jul 14 '19 21:07

ZachV


People also ask

What is concurrent futures?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

What is max_ workers in ThreadPoolExecutor?

The max_workers argument is the first argument in the constructor and does not need to be specified by name to be set; for example: ... # create a thread pool and set the number of worker threads executor = ThreadPoolExecutor(100)

What does brokenprocesspool mean?

#code that returns "BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending."

What is the purpose of processpoolexecutor in Python?

From Python 3.2 onwards a new class called ProcessPoolExecutor was introduced in python in concurrent. The futures module to efficiently manage and create Process. But wait, if python already had a multiprocessing module inbuilt then why a new module was introduced. Let me answer this first.

How to break the pipe of a processpoolexecutor?

I know three typical ways to break the Pipe of a ProcessPoolExecutor: Your system runs into limits, most likely memory, and starts killing processes. As a fork on windows clones your memory content, this is not unlikely when working with large DataFrames.

What is threadpoolexecutor in Java?

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. Deadlocks can occur when the callable associated with a Future waits on the results of another Future. For example: And: class concurrent.futures.


2 Answers

I know three typical ways to break the Pipe of a ProcessPoolExecutor:

OS kill/termination

Your system runs into limits, most likely memory, and starts killing processes. As a fork on windows clones your memory content, this is not unlikely when working with large DataFrames.

How to identify

  • Check memory consumption in your task manager.
  • Unless your DataFrames occupy half of your memory, it should disappear with max_workers=1, this is not unambiguous however.

Self-Termination of the Worker

The Python instance of the subprocess terminates due to some error that does not raise a proper Exception. One example would be a segfault in an imported C-module.

How to identify

As your code runs properly without the PPE, the only scenario I can think of is if some module is not multiprocessing-safe. It then also has a chance to disappear with max_workers=1. It might also be possible to induce the Error in the main process by calling the function manually right after the workers are created (the line after the for-loop that calls executor.submit. Otherwise it could be really hard to identify, but in my opinion it is the most unlikely case.

Exception in PPE Code

The subprocess side of the pipe (i.e. code handling the communication) may crash, which results in a proper Exception, that unfortunately can not be communicated to the master process.

How to identify

As the code is (hopefully) well tested, the prime suspect lies in the return data. It must be pickled and sent back via socket - both steps can crash. So you have to check:

  • is the return data picklable?
  • is the pickled object small enough to be sent (about 2GB)?

So you can either try to return some simple dummy-data instead, or check the two conditions explicitely:

    if len(pickle.dumps((dClose_dt, d2Close_dt2, d3Close_dt3))) > 2 * 10 ** 9: 
        raise RuntimeError('return data can not be sent!')

In Python 3.7, this problem is fixed, and it sends back the Exception.

like image 87
Matthias Huschle Avatar answered Oct 23 '22 03:10

Matthias Huschle


I found this in the official documents:

"The main module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter. Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock."

Have you ever tried this? The following works for me:

if __name__ == '__main__':
     executors_list = multi_processing()
     for output in executors_list:
         print(output.result())
like image 4
莫昌钦 Avatar answered Oct 23 '22 03:10

莫昌钦