Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing pool apply_async error

I'm trying to evaluate a number of processes in a multiprocessing pool but keep running into errors and I can't work out why... There's a simplified version of the code below:

class Object_1():

    def add_godd_spd_column()

        def calculate_correlations(arg1, arg2, arg3):
            return {'a': 1}

        processes = {}
        pool = Pool(processes=6)
        for i in range(1, 10):
            processes[i] = pool.apply_async(calculate_correlations,
                                            args=(arg1, arg2, arg3,))

        correlations = {}
        for i in range(0, 10):
            correlations[i] = processes[i].get()

This returns the following error:

Traceback (most recent call last):
  File "./02_results.py", line 116, in <module>
    correlations[0] = processes[0].get()
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 608, in get
    raise self._value
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 385, in 
_handle_tasks
    put(task)
  File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 206, in send
     self._send_bytes(ForkingPickler.dumps(obj))
   File "/opt/anaconda3/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'SCADA.add_good_spd_column.<locals>.calculate_correlations

When I call the following: correlations[0].successful() I get the following error:

Traceback (most recent call last):
  File "./02_results.py", line 116, in <module>
    print(processes[0].successful())
  File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 595, in 
successful
    assert self.ready()
AssertionError

Is this because the process isn't actually finished before the .get() is called? The function being evaluated just returns a dictionary which should definitely be pickle-able...

Cheers,

like image 536
WRJ Avatar asked Jun 08 '26 08:06

WRJ


1 Answers

The error is occurring because pickling a function nested in another function is not supported, and multiprocessing.Pool needs to pickle the function you pass as an argument to apply_async in order to execute it in a worker process. You have to move the function to the top level of the module, or make it an instance method of the class. Keep in mind that if you make it an instance method, the instance of the class itself must also be picklable.

And yes, the assertion error when calling successful() occurs because you're calling it before a result is ready. From the docs:

successful()

Return whether the call completed without raising an exception. Will raise AssertionError if the result is not ready.

like image 123
dano Avatar answered Jun 10 '26 13:06

dano



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!