Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor with Context Manager "cannot schedule new futures after shutdown"

I'm creating a thread manager class that handles executing tasks as threads and passing the results to the next process step. The flow works properly upon the first execution of receiving a task, but the second execution fails with the following error :

...python3.8/concurrent/futures/thread.py", line 179, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

The tasks come from Cmd.cmdloop user input - so, the script is persistent and meant to not shutdown. Instead, run will be called multiple times, as input is received from the user.

I've implemented a ThreadPoolExecutor to handle the work load and trying to gather the results chronologically with concurrent.futures.as_completed so each item is processed to the next step in order of completion.

The run method below works perfect for the first execution, but returns the error upon second execution of the same task (that succeeded during the first execution).

  def run ( self, _executor=None, _futures={}, ) -> bool :
     
     task = self.pipeline.get( )
     
     with _executor or self.__default_executor as executor :
         
         _futures = { executor.submit ( task.target.execute, ), }

         for future in concurrent.futures.as_completed ( _futures, ) :
                     
             print( future.result ( ) )
                             
     return True

So, the idea is that each call to run will create and teardown the executor with the context. But the error suggests the context shutdown properly after the first execution, and cannot be reopened/recreated when run is called during the second iteration... what is this error pointing to? .. what am I missing?

Any help would be great - thanks in advance.

like image 733
sadmicrowave Avatar asked Jul 25 '21 04:07

sadmicrowave


1 Answers

Your easiest solution will be to use the multiprocessing library instead sending futures and ThreadPoolExecutore with Context Manager:

    pool = ThreadPool(50)
    pool.starmap(test_function, zip(array1,array2...))
    pool.close()
    pool.join()

While (array1[0] , array2[0]) will be the values sent to function "test_function" at the first thread, (array1[1] , array2[1]) at the second thread, and so on.

like image 61
ALUFTW Avatar answered Nov 12 '22 18:11

ALUFTW