Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel code not working when function to parallelize is in a different file

I have a simple function that I want to run in parallel. If the function is directly specified in the main function, it all works nicely. But if the very same function is called from a separate Python file (that is created to contains a series of helper functions), the code fails with the error:

A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

I have tried to run this code:

from joblib import Parallel, delayed
import multiprocessing
import otherFile as of

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results1 = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs) # this works
results2 = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs) # this fails

When I call the function processInput() from the of file I have simply copied the same function in that .py file.

def processInput(i):
    return i * i

How can I make the parallelization work if the function I need to call is in a separate .py file?

This is the full error:

results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)
Traceback (most recent call last):

  File "<ipython-input-387-d8dd1dc361a6>", line 1, in <module>
    results = Parallel(n_jobs=num_cores)(delayed(of.processInput)(i) for i in inputs)

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 934, in __call__
    self.retrieve()

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\site-packages\joblib\_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 432, in result
    return self.__get_result()

  File "C:\Users\xxxxx\AppData\Local\Continuum\anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.*
like image 682
opt Avatar asked May 10 '19 13:05

opt


People also ask

How do you execute the same function in parallel in Python?

We can also run the same function in parallel with different parameters using the Pool class. For parallel mapping, We have to first initialize multiprocessing. Pool() object. The first argument is the number of workers; if not given, that number will be equal to the number of elements in the system.

How do I run two parallel programs in Python?

Multiprocessing in Python enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel.

How do you parallelize a function?

The general way to parallelize any operation is to take a particular function that should be run multiple times and make it run parallelly in different processors. To do this, you initialize a Pool with n number of processors and pass the function you want to parallelize to one of Pool s parallization methods.

Can you parallelize a for loop?

Use the joblib Module to Parallelize the for Loop in Python The joblib module uses multiprocessing to run the multiple CPU cores to perform the parallelizing of for loop. It provides a lightweight pipeline that memorizes the pattern for easy and straightforward parallel computation.


1 Answers

Not sure if you have checked if the imported function 'of.processInput' works without using multiprocessing? If it doesn't work, then this could be the elephant in the room that others have not pointed out. Maybe you're missing

__init__.py

or maybe it is because the directory is not being seen by Python's import command. To add the directory you can do:

import sys; sys.path.append("path/to/otherFile/")

Though I am not sure if the error message you get is even remotely related to this issue.

like image 100
Solomon Vimal Avatar answered Oct 20 '22 08:10

Solomon Vimal