Hi I am trying to run the multiprocessing example in the docs: http://docs.python.org/3.4/library/concurrent.futures.html, the one using prime numbers but with a small difference.
I want to be able to call a function with multiple arguments. What I am doing is matching small pieces of text (in a list around 30k long) to a much larger piece of text and return where in the larger string the smaller strings start.
I can do this serially like this:
matchList = []
for pattern in patterns:
# Approximate pattern matching
patternStartingPositions = processPattern(pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray)
# Now add each starting position found onto our master list.
for startPos in patternStartingPositions:
matchList.append(startPos)
But I want to do this to speed things up:
matchList = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for pattern, res in zip(patterns, executor.map(processPattern(pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray), patterns)):
print('%d is starts at: %s' % (pattern, res))
At this stage I've just got the print call there because I can't get the line above, the invocation of the processes to work.
The only real difference between what I want to do and the example code is that my function takes 7 arguments and I have no idea how to do it, spent half the day on it.
The call above generates this error:
UnboundLocalError: local variable 'pattern' referenced before assignment.
Which makes sense.
But then if I leave out that first argument, which is the one that changes with each call, and leave out the first parameter to the processPattern
function:
matchList = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for pattern, res in zip(patterns, executor.map(processPattern(numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray), patterns)):
print('%d is starts at: %s' % (pattern, res))
Then I get this error:
TypeError: processPattern() missing 1 required positional argument: 'suffixArray'.
I don't know how to get the pattern
argument in the call!
To get the data into the right shape, simply use a generator expression (no need for zip
at all) and use submit
rather than map
:
(pattern, executor.submit(processPattern, pattern, ...) for pattern in patterns)
To ensure that everything gets executed on the pool (instead of immediately), do not invoke the processPatterns
function as you are doing in your example, but instead pass it in as the first argument to .submit
. The fixed version of your code would be:
with concurrent.futures.ProcessPoolExecutor() as executor:
for pattern, res in ((pattern, executor.submit(processPattern, pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray)) for pattern in patterns):
print('%d is starts at: %s' % (pattern, res.result()))
Other posters have covered possible solutions, but to explain your error, you should be passing the function and parameters as separate objects to executor.map
. Here is the example from the docs
with concurrent.futures.ProcessPoolExecutor() as executor:
# is_prime is the function, PRIMES are the arguments
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
Your code is evaluating the processPattern
function and passing in the result to executor.map
with concurrent.futures.ProcessPoolExecutor() as executor:
for pattern, res in zip(patterns, executor.map(processPattern(numMis... # <- BAD
print('%d is starts at: %s' % (pattern, res))
Instead it should be
with concurrent.futures.ProcessPoolExecutor() as executor:
for pattern, res in zip(patterns, executor.map(processPattern, <stuff>)):
print('%d is starts at: %s' % (pattern, res))
Where <stuff>
is an iterable of the arguments to pass to processPattern
on each subsequent call.
Or, seeing as the other args stay fixed, create a function that just takes the one parameter you are iterating over and pass in patterns
as the iterable (as @uhbif19 suggests)
EDIT:
To expand on how to create the <stuff>
iterable, you need an iterable for each argument required by your function (processPattern
in this case). You already have patterns
as the iterable for argument one, if the others are constant then itertools.repeat
may be helpful:
from itertools import repeat
args = (patterns,
repeat(numMismatchesAllowed, len(PATTERNS)),
repeat(transformedText, len(PATTERNS)),
repeat(charToIndex, len(PATTERNS)),
<etc...>
)
Then
for pattern, res in zip(PATTERNS, executor.map(process, *args)):
I include this for the sake of understanding, but you can see how messy this is. The other answers offer better solutions.
EDIT 2:
Here's an example that better illustrates the use of submit vs map
import concurrent.futures
def process(a, b):
return a.upper() + b
with concurrent.futures.ProcessPoolExecutor() as executor:
for c, fut in [(c, executor.submit(process, c, 'b')) for c in 'testing']:
print(c, fut.result())
with concurrent.futures.ProcessPoolExecutor() as executor:
for c, res in zip('testing', executor.map(process, 'testing', 'bbbbbbb')):
print(c, str(res))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With