Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why doesn't concurrent.futures make a copy of arguments?

My understanding was that concurrent.futures relied on pickling arguments to get them running in different processes (or threads). Shouldn't pickling create a copy of the argument? On Linux it does not seem to be doing so, i.e., I have to explicitly pass a copy.

I'm trying to make sense of the following results:

<0> rands before submission: [17, 72, 97, 8, 32, 15, 63, 97, 57, 60]
<1> rands before submission: [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
<2> rands before submission: [15, 57, 63, 17, 97, 97, 8, 32, 60, 72]
<3> rands before submission: [32, 97, 63, 72, 17, 57, 97, 8, 15, 60]
in function 0 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
in function 1 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 2 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 3 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]

Here's the code:

from __future__ import print_function
import time
import random
try:
    from concurrent import futures
except ImportError:
    import futures


def work_with_rands(i, rands):
    print('in function', i, rands)


def main():
    random.seed(1)
    rands = [random.randrange(100) for _ in range(10)]

    # sequence 1 and sequence 2 should give the same results but they don't
    # only difference is that one uses a copy of rands (i.e., rands.copy())
    # sequence 1
    with futures.ProcessPoolExecutor() as ex:
        for i in range(4):
            print("<{}> rands before submission: {}".format(i, rands))
            ex.submit(work_with_rands, i, rands)
            random.shuffle(rands)

    print('-' * 30)
    random.seed(1)
    rands = [random.randrange(100) for _ in range(10)]
    # sequence 2
    print("initial sequence: ", rands)
    with futures.ProcessPoolExecutor() as ex:
        for i in range(4):
            print("<{}> rands before submission: {}".format(i, rands))
            ex.submit(work_with_rands, i, rands[:])
            random.shuffle(rands)

if __name__ == "__main__":
    main()

Where on earth is [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] coming from? That's not even one of the sequences passed to submit.

The results differ slightly under Python 2.

like image 298
ariddell Avatar asked Oct 01 '22 01:10

ariddell


2 Answers

Basically, ProcessPoolExecutor.submit() method put function and its arguments to some "Work Items" dict (without any pickling), that is shared with another thread (_queue_management_worker), and that thread passes WorkItems from that dict to queue that is read by actual worker process.

There is a comment in source code, describing the concurrent module architecture: http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l6

It turns out, that there is not enough time for _queue_management_worker to get notified about new items between submit calls.

So, that thread waits here all the time: (http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l226) and only wakes on ProcessPoolExecutor.shutdown (on exit from ProcessPoolExecutor context).

If you put some delay in your first sequence, like that:

with futures.ProcessPoolExecutor() as ex:
    for i in range(4):
        print("<{}> rands before submission: {}".format(i, rands))
        ex.submit(work_with_rands, i, rands)
        random.shuffle(rands)
        time.sleep(0.01)

you will see, that _queue_management_worker will wake up and pass calls to worker processes, and work_with_rands will print different values.

like image 91
psl Avatar answered Oct 13 '22 10:10

psl


you share the same list on all threads and its mutated. its hard to debug because when you add a print it will behave differently. but this [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] must be a state inside shuffle. shuffle holds the list (the same list that exists in all threads) and changes it more than once. at the time the threads are called the state is [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]. The values don't get copied immanently, they are copied in another thread so you can't guarantee when they will be copied.

An example of what shuffle produces before the shuffle is done:

[31, 64, 88, 7, 68, 85, 69, 3, 15, 47] # initial value (rands)
# ex.submit() is called here
# shuffle() is called here
# shuffle starts changing rand to:
[31, 64, 88, 47, 68, 85, 69, 3, 15, 7]
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7]
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7]
[31, 64, 69, 47, 68, 85, 15, 3, 88, 7]
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # threads may be called here
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here
[31, 85, 64, 47, 68, 69, 15, 3, 88, 7]
[85, 31, 64, 47, 68, 69, 15, 3, 88, 7] # value when the shuffle has finished

shuffle source code:

def shuffle(self, x, random=None):
    if random is None:
        randbelow = self._randbelow
        for i in reversed(range(1, len(x))):
            # pick an element in x[:i+1] with which to exchange x[i]
            j = randbelow(i+1)
            x[i], x[j] = x[j], x[i]
            # added this print here. that's what prints the output above
            # your threads are probably being called when this is still pending
            print(x) 
     ... other staff here

so if your input is [17, 72, 97, 8, 32, 15, 63, 97, 57, 60] and your output is [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] the shuffle has "steps in the middle between that". your threads get called in the "steps in the middle"

An example without mutation, in general try to avoid sharing data between threads because its really hard to get it right:

def work_with_rands(i, rands):
    print('in function', i, rands)


def foo(a):
    random.seed(random.randrange(999912)/9)
    x = [None]*len(a)
    for i in a:
        _rand = random.randrange(len(a))

        while x[_rand] is not None:
            _rand = random.randrange(len(a))

        x[_rand] = i
    return x

def main():
    rands = [random.randrange(100) for _ in range(10)]
    with futures.ProcessPoolExecutor() as ex:
        for i in range(4):
            new_rands = foo(rands)
            print("<{}> rands before submission: {}".format(i, new_rands ))
            ex.submit(work_with_rands, i, new_rands )


<0> rands before submission: [84, 12, 93, 47, 40, 53, 74, 38, 52, 62]
<1> rands before submission: [74, 53, 93, 12, 38, 47, 52, 40, 84, 62]
<2> rands before submission: [84, 12, 93, 38, 62, 52, 53, 74, 47, 40]
<3> rands before submission: [53, 62, 52, 12, 84, 47, 93, 40, 74, 38]
in function 0 [84, 12, 93, 47, 40, 53, 74, 38, 52, 62]
in function 1 [74, 53, 93, 12, 38, 47, 52, 40, 84, 62]
in function 2 [84, 12, 93, 38, 62, 52, 53, 74, 47, 40]
in function 3 [53, 62, 52, 12, 84, 47, 93, 40, 74, 38]
like image 28
Foo Bar User Avatar answered Oct 13 '22 12:10

Foo Bar User