Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

combining functools.lru_cache with multiprocessing.Pool

I have a rather complex recursive function with many parameters (Obara-Saika-Scheme in case anyone is wondering), that I would like to evualate more efficiently. As a first step I applied @functools.lru_cache. As a second step, I now want to use multiprocessing.Pool to asynchronously evaluate a long list of input parameters.

Adapting the second example from the functools Python docs and adding a pool of workers I have:

from multiprocessing import Pool
from functools import lru_cache

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with Pool(processes=4) as pool:
    for i in range(10):
        res = pool.apply_async(fibonacci, (i,))
        print(res.get())

print(fibonacci.cache_info())

Question 1

How do I get the cache to be shared over the different workers. Another question (How to share a cache?) is asking a similar thing, but I could not get it working. Here are my 2 failed approaches to this.

Using multiprocessing.Pool:

from multiprocessing import Pool
from functools import lru_cache
import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

res = []
with Pool(processes=4) as pool:

    # submit first task
    res.append(pool.apply_async(fibonacci, (5,)).get())

    # give fibonacci() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(pool.apply_async(fibonacci, (3,)).get())

print(res)

Using concurrent.futures:

import concurrent.futures
from functools import lru_cache

import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

    @lru_cache(maxsize=10)
    def fib_async(n):
        print('calculating fib_async(%i)' %n)
        if n < 2:
            return n
        return fibonacci(n-1) + fibonacci(n-2)

    res = []

    # submit first task
    res.append(executor.submit(fib_async, 5))

    # give fib_async() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(executor.submit(fib_async, 3))


res = [e.result() for e in res]

print(res)

Both produce basically the same output, showing that the second task recalculates fibonacci(2), although the first task already had to calculate it. How do I get the cache shared?

This should speed things up a little, but still has a problem if duplicate calls are badly timed: a call currently evaluated by worker1 is not yet cached and worker2 might start to evaluate the same thing. Which brings me to:

Question 2

Calculating Fibonacci numbers is rather linear in its recursion, i.e. there is only one parameter being decremented. My function is more complex and I could use something that manages not only what input parameters have already been calculated, but keeps track of what is currently being calculated.

To be clear: I want to make many parallel calls to the recursive function which will spawn many new calls to the recursive function.

A tricky thing might be to avoid assigning one call directly to a worker, since this would cause deadlocks when the recursion depths exceeds the number of workers.

Is there already such a thing I could use? Or do I need to build something on my own? I stumpled upon multiprocessing.managers and concurrent.futures.ProcessPoolExecutor which might be helpful. But I could use some help to get started.

like image 853
Feodoran Avatar asked Apr 19 '17 12:04

Feodoran


1 Answers

As your desired capability is CPU-bounded, you are right to choose multiprocessing for this task.

The function @lru_cache uses an in memory cache. Each python process contains its own memory block, therefor you are generating 2 independent caches (that live on different memory spaces).

Should you want to synchronize those caches, you would need to use some sort of memory synchronization mechanism, such as locks etc. The default lru_cache method does not multi processing, but you can implement one by your own quite easily.

Simply use a shared dictionary (here is a good example) to hold the cached items, and wrap the access to that dict using a lock (for reference here is the python wiki page). This way you can share the dict across processes while keeping the access safe.

like image 198
Den1al Avatar answered Oct 07 '22 18:10

Den1al