Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

lazy processpoolexecutor in Python?

I have a large number of tasks that I want to execute and make the results available via a generator. However, using a ProcessPoolExecutor and as_completed will evaluate the results greedily and store them all in memory. Is there a way to block after a certain number of results are stored in the generator?

like image 257
Akababa Avatar asked Jan 07 '18 07:01

Akababa


People also ask

What is future object in Python?

Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled. A Future can be awaited multiple times and the result is same.

What is ProcessPoolExecutor in Python?

The Python ProcessPoolExecutor allows you to create and manage process pools in Python. Although the ProcessPoolExecutor has been available since Python 3.2, it is not widely used, perhaps because of misunderstandings of the capabilities and limitations of Processes and Threads in Python.

What is thread pool in Python?

A thread pool may be defined as the group of pre-instantiated and idle threads, which stand ready to be given work. Creating thread pool is preferred over instantiating new threads for every task when we need to do large number of tasks.

How do you stop a ProcessPoolExecutor?

The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.


1 Answers

The idea for this is to split what you want to process in chunks, I'll be using almost the same example than in the ProcessPoolExecutor documentation:

import concurrent.futures
import math
import itertools as it

PRIMES = [
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419]


def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

def main_lazy():
    chunks = map(lambda x: it.islice(PRIMES, x, x+4), range(0, len(PRIMES), 4))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        for number, prime in (next(results) for _ in range(4)):
            print('%d is prime: %s' % (number, prime))

if __name__ == "__main__":
    main_lazy()

Notice the differences between main and main_lazy, let's explain this a bit:

Instead of having a list of all what we want to process I split it into chunks of size 4 (it's useful to use itertools.islice), the idea is that instead of mapping with the executor the whole list we will be mapping the chunks. Then just using python3 lazy map we can map that executor call lazily to each of the chunks. So, we know that executor.map is not lazy so that chunk will be evaluated immediately when we request it, but till we don't request the other chunks the executor.map for that chunks will not be called. As you can see I'm only requesting the first 4 elements from the whole list of results, but since I also used itertools.chain it will just consume the ones from the first chunk, without calculating the rest of the iterable.

So, since you wanted to return a generator, it would be as easy as return the results from the main_lazy function, you can even abstract the chunk size (probably you would need a good function to get the propper chunks, but this is out of scope):

def main_lazy(chunk_size):
    chunks = map(lambda x: it.islice(PRIMES, x, x+chunk_size), range(0, len(PRIMES), chunk_size))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        return results
like image 194
Netwave Avatar answered Sep 18 '22 21:09

Netwave