I have a simple function that I want to run in parallel. If the function is directly specified in the main function, it all works nicely. But if the very same function is called from a separate Python file (that is created to contains a series of helper functions), the code fails with the error:
A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
I have tried to run this code:
from joblib import Parallel, delayed
import multiprocessing
import otherFile as of
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results1 = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs) # this works
results2 = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs) # this fails
When I call the function processInput() from the of file I have simply copied the same function in that .py file.
def processInput(i):
return i * i
How can I make the parallelization work if the function I need to call is in a separate .py file?
This is the full error:
results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)
Traceback (most recent call last):
File "<ipython-input-387-d8dd1dc361a6>", line 1, in <module>
results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)
File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 934, in __call__
self.retrieve()
File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 833, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\_parallel_backends.py", line 521, in wrap_future_result
return future.result(timeout=timeout)
File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 432, in result
return self.__get_result()
File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.*
We can also run the same function in parallel with different parameters using the Pool class. For parallel mapping, We have to first initialize multiprocessing. Pool() object. The first argument is the number of workers; if not given, that number will be equal to the number of elements in the system.
Multiprocessing in Python enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel.
The general way to parallelize any operation is to take a particular function that should be run multiple times and make it run parallelly in different processors. To do this, you initialize a Pool with n number of processors and pass the function you want to parallelize to one of Pool s parallization methods.
Use the joblib Module to Parallelize the for Loop in Python The joblib module uses multiprocessing to run the multiple CPU cores to perform the parallelizing of for loop. It provides a lightweight pipeline that memorizes the pattern for easy and straightforward parallel computation.
Not sure if you have checked if the imported function 'of.processInput' works without using multiprocessing? If it doesn't work, then this could be the elephant in the room that others have not pointed out. Maybe you're missing
__init__.py
or maybe it is because the directory is not being seen by Python's import
command. To add the directory you can do:
import sys; sys.path.append("path/to/otherFile/")
Though I am not sure if the error message you get is even remotely related to this issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With