Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing And Argument Passing Help Needed

Hi I am trying to run the multiprocessing example in the docs: http://docs.python.org/3.4/library/concurrent.futures.html, the one using prime numbers but with a small difference.

I want to be able to call a function with multiple arguments. What I am doing is matching small pieces of text (in a list around 30k long) to a much larger piece of text and return where in the larger string the smaller strings start.

I can do this serially like this:

matchList = []
for pattern in patterns:

    # Approximate pattern matching
    patternStartingPositions = processPattern(pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray)

    # Now add each starting position found onto our master list.
    for startPos in patternStartingPositions:
        matchList.append(startPos)

But I want to do this to speed things up:

matchList = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for pattern, res in zip(patterns, executor.map(processPattern(pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray), patterns)):
        print('%d is starts at: %s' % (pattern, res))

At this stage I've just got the print call there because I can't get the line above, the invocation of the processes to work.

The only real difference between what I want to do and the example code is that my function takes 7 arguments and I have no idea how to do it, spent half the day on it.

The call above generates this error:

UnboundLocalError: local variable 'pattern' referenced before assignment.

Which makes sense.

But then if I leave out that first argument, which is the one that changes with each call, and leave out the first parameter to the processPattern function:

matchList = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for pattern, res in zip(patterns, executor.map(processPattern(numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray), patterns)):
        print('%d is starts at: %s' % (pattern, res))

Then I get this error:

TypeError: processPattern() missing 1 required positional argument: 'suffixArray'.

I don't know how to get the pattern argument in the call!

like image 648
davo36 Avatar asked Mar 20 '23 13:03

davo36


2 Answers

To get the data into the right shape, simply use a generator expression (no need for zip at all) and use submit rather than map:

(pattern, executor.submit(processPattern, pattern, ...) for pattern in patterns)

To ensure that everything gets executed on the pool (instead of immediately), do not invoke the processPatterns function as you are doing in your example, but instead pass it in as the first argument to .submit. The fixed version of your code would be:

with concurrent.futures.ProcessPoolExecutor() as executor:
    for pattern, res in ((pattern, executor.submit(processPattern, pattern, numMismatchesAllowed, transformedText, charToIndex, countMatrix, firstOccurrence, suffixArray)) for pattern in patterns):
        print('%d is starts at: %s' % (pattern, res.result()))
like image 69
Sean Vieira Avatar answered Apr 26 '23 18:04

Sean Vieira


Other posters have covered possible solutions, but to explain your error, you should be passing the function and parameters as separate objects to executor.map. Here is the example from the docs

with concurrent.futures.ProcessPoolExecutor() as executor:
    # is_prime is the function, PRIMES are the arguments
    for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 
        print('%d is prime: %s' % (number, prime))

Your code is evaluating the processPattern function and passing in the result to executor.map

with concurrent.futures.ProcessPoolExecutor() as executor:
    for pattern, res in zip(patterns, executor.map(processPattern(numMis... # <- BAD
        print('%d is starts at: %s' % (pattern, res))

Instead it should be

with concurrent.futures.ProcessPoolExecutor() as executor:
    for pattern, res in zip(patterns, executor.map(processPattern, <stuff>)):
        print('%d is starts at: %s' % (pattern, res))

Where <stuff> is an iterable of the arguments to pass to processPattern on each subsequent call.

Or, seeing as the other args stay fixed, create a function that just takes the one parameter you are iterating over and pass in patterns as the iterable (as @uhbif19 suggests)

EDIT:

To expand on how to create the <stuff> iterable, you need an iterable for each argument required by your function (processPattern in this case). You already have patterns as the iterable for argument one, if the others are constant then itertools.repeat may be helpful:

from itertools import repeat
args = (patterns, 
        repeat(numMismatchesAllowed, len(PATTERNS)),
        repeat(transformedText, len(PATTERNS)),
        repeat(charToIndex, len(PATTERNS)),
        <etc...>
        )

Then

for pattern, res in zip(PATTERNS, executor.map(process, *args)):

I include this for the sake of understanding, but you can see how messy this is. The other answers offer better solutions.

EDIT 2:

Here's an example that better illustrates the use of submit vs map

import concurrent.futures

def process(a, b):
    return a.upper() + b

with concurrent.futures.ProcessPoolExecutor() as executor:
    for c, fut in [(c, executor.submit(process, c, 'b')) for c in 'testing']:
        print(c, fut.result())

with concurrent.futures.ProcessPoolExecutor() as executor:
    for c, res in zip('testing', executor.map(process, 'testing', 'bbbbbbb')):
        print(c, str(res))
like image 32
Peter Gibson Avatar answered Apr 26 '23 20:04

Peter Gibson