I would like concurrent.futures.ProcessPoolExecutor.map()
to call a function consisting of 2 or more arguments. In the example below, I have resorted to using a lambda
function and defining ref
as an array of equal size to numberlist
with an identical value.
1st Question: Is there a better way of doing this? In the case where the size of numberlist can be million to billion elements in size, hence ref size would have to follow numberlist, this approach unnecessarily takes up precious memory, which I would like to avoid. I did this because I read the map
function will terminate its mapping until the shortest array end is reach.
import concurrent.futures as cf
nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
print(n)
if str(ref[0]) in n:
print('match')
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
print(type(n))
print(n)
if str(ref[0]) in n:
print('match')
Running the code above, I found that the map
function was able to achieve my desired outcome. However, when I transferred the same terms to concurrent.futures.ProcessPoolExecutor.map(), python3.5 failed with this error:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
Question 2: Why did this error occur and how do I get concurrent.futures.ProcessPoolExecutor.map() to call a function with more than 1 argument?
The ProcessPoolExecutor implements the Executor abstract class and provides a process pool in Python. In this tutorial, you will discover the ProcessPoolExecutor class. Let's get started.
The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
The Python ThreadPoolExecutor allows you to create and manage thread pools in Python. Although the ThreadPoolExecutor has been available since Python 3.2, it is not widely used, perhaps because of misunderstandings of the capabilities and limitations of Threads in Python.
The ThreadPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map(). For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map().
To answer your second question first, you are getting an exception because a lambda
function like the one you're using is not picklable. Since Python uses the pickle
protocol to serialize the data passed between the main process and the ProcessPoolExecutor
's worker processes, this is a problem. It's not clear why you are using a lambda
at all. The lambda you had takes two arguments, just like the original function. You could use _findmatch
directly instead of the lambda
and it should work.
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_findmatch, numberlist, ref):
...
As for the first issue about passing the second, constant argument without creating a giant list, you could solve this in several ways. One approach might be to use itertools.repeat
to create an iterable object that repeats the same value forever when iterated on.
But a better approach would probably be to write an extra function that passes the constant argument for you. (Perhaps this is why you were trying to use a lambda
function?) It should work if the function you use is accessible at the module's top-level namespace:
def _helper(x):
return _findmatch(x, 5)
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_helper, numberlist):
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With