The following is taken from David Beazley's slides on generators (here for anybody interested).
A Task
class is defined which wraps a generator that yields futures, the Task
class, in full (w/o error handling), follows:
class Task:
def __init__(self, gen):
self._gen = gen
def step(self, value=None):
try:
fut = self._gen.send(value)
fut.add_done_callback(self._wakeup)
except StopIteration as exc:
pass
def _wakeup(self, fut):
result = fut.result()
self.step(result)
In an example, the following recursive function is also defined:
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(max_workers=8)
def recursive(n):
yield pool.submit(time.sleep, 0.001)
print("Tick :", n)
Task(recursive(n+1)).step()
The following two cases play out:
From the Python REPL, if we define these (or import them if we place them in a file) and then jump-start the recursion with:
Task(recursive(0)).step()
it starts printing away seeming to a point where the recursion limit will have been exceeded. It obviously doesn't exceed it though, printing the stack level shows that it stays constant throughout execution. Something else is going on which I don't quite understand.
NOTE: You'll need to kill the python process if you execute it like this.
If we put all contents (Task
, recursive
) in a file along with:
if __name__ == "__main__":
Task(recursive(0)).step()
and then run it with python myfile.py
, it stops ticking at 7
(the number of max_workers
, it seems).
My question is how does it seemingly surpass the recursion limit and why does it behave differently based on how you execute it?
The behavior appears on both Python 3.6.2 and Python 3.5.4 (and I'd guess others in the 3.6
and 3.5
family too).
Lets start with what is number 7. That is the number of workers like you have mentioned already, labeled from [0..7]. The Task class needs to be passed recursive
in form a function identifier.
Task(recursive).step(n)
instead of
Task(recursive(n)).step()
It is because, the recursive function needs to be called inside the pool
environment while in current case recursive
is evaluated in main thread itself. time.sleep
is the only function in current code which is evaluated in task pool.
A key facet where code has major issue is recursion. Each thread in the pool is dependent on inner function putting upper limit on execution to number of workers available. The function is not able to finish hence new one cannot execute. Thus, it terminates much before recursion limit is reached.
The recursive
generator you show isn't actually recursive in a way that would cause a problem with the system recursion limit.
To understand why you need to pay attention to when the recursive
generator's code runs. Unlike a normal function, just calling recursive(0)
doesn't cause it to immediately run its code and make additional recursive calls. Instead, calling recursive(0)
immediately returns a generator object. Only when you send()
to the generator does the code run, and only after you send()
to it a second time does it kick off another call.
Let's examine the call stack as the code runs. At the top level, we run Task(recursive(0)).step()
. That does three things in a sequence:
recursive(0)
This call returns a generator object immediately.Task(_)
The Task
object is created, and its __init__
method stores a reference to the generator object created in the first step._.step()
A method on the task gets called. This is where the action really starts! Let's look at what happens inside the call:
fut = self._gen.send(value)
Here we actually start the generator running, by sending it a value. Let's go deeper and see the generator code run:
yield pool.submit(time.sleep, 0.001)
This schedules something to be done in another thread. We don't wait for it to happen though. Instead, we get a Future
that we can use to get notified when it's complete. We yield the future immediately back to the previous level of code.fut.add_done_callback(self._wakeup)
Here we ask for our _wakeup()
method to be called when the future is ready. This always returns immediately!step
method ends now. That's right, we're done (for the moment)! This is important for the second part of your question, which I'll discuss more later.The call we made ended, so control flow returns to the REPL if we're running interactively. If we're running as a script, the interpreter instead will reach the end of the scrip and start shutting down (I'll discuss this more below). However, the other threads controlled by the thread pool are still running, and at some point, one of them is going to do some stuff we care about! Let's see what that is.
When the scheduled function (time.sleep
) has finished running, the thread it was running in will call the callback we set on the Future
object. That is, it will call Task._wakup()
on the Task
object we created earlier (which we don't have a reference to anymore at the top level, but the Future
kept a reference so it's still alive). Let's look at the method:
result = fut.result()
Store the result of the deferred call. This is irrelevant in this case since we never look at the results (it's None
anyway).self.step(result)
Step again! Now we're back to the code we care about. Let's see what it does this time:
fut = self._gen.send(value)
Send to the generator again, so it takes over. It already yielded once, so this time we start just after the yield
:
print("Tick :", n)
This is pretty simple.Task(recursive(n+1)).step()
This is where things get interesting. This line is just like what we started with. So, like before, this is going to run the logic 1-4 I listed above (including their substeps). But instead of returning to the REPL or ending the script, when the step()
method returns, it comes back here.recursive()
generator (the original one, not the new one we just created) has reached its end. So, like any generator that reaches the end of it's code, it raises StopIteration
.StopIteration
is caught and ignored by the try
/except
block, and the step()
method ends._wakup()
method ends too, so the callback is done.Task
created in the earlier callback will be called as well. So we go back and repeat step 5, over and over, forever (if we're running interactively).The call stack above explains why the interactive case prints forever. The main thread returns to the REPL (and you can do other stuff with it if you can see past the output from the other threads). But in the pool, each thread schedules another job from the callback of its own job. When the next job finishes, its callback schedules another job and so on.
So why do you get only 8 printouts when you run the code as a script? The answer is hinted at in step 4 above. When running non-interactively, the main thread runs off the end of the script after the first call to Task.step
returns. This prompts the interpreter to try to shut down.
The concurrent.futures.thread
module (where ThreadPoolExecutor
is defined) has some fancy logic that tries to clean up nicely when the program shuts down while an executor is still active. It's supposed to stop any idle threads, and signal any that are still running to stop when their current job is complete.
The exact implementation of that cleanup logic interacts with our code in a very odd way (which may or may not buggy). The effect is that the first thread keeps giving itself more jobs to do, while additional worker threads that are spawned keep exiting immediately after they are spawned. The first worker finally quits when the executor has started as many threads as it wanted to be using (8 in our case).
Here's the sequence of events, as I understand it.
concurrent.futures.thread
module, which uses atexit
to tell the interpreter to run a function named _python_exit
just before the interpreter shuts down.ThreadPoolExecutor
with a maximum thread count of 8. It doesn't spawn its worker threads immediately, but will create one each time a job is scheduled until it has all 8.sleep
call takes much longer than the rest of the steps, so the thread is going to be stuck here for a bit._python_exit
function gets called by the interpreter, since the interpreter wants to shut down. The function sets a global _shutdown
variable in the module, and sends a None
to the internal queue of the executor (it sends one None
per thread, but there's just the one thread created so far, so it just sends one None
). It then blocks the main thread until the thread it knows about has quit. This delays the interpreter shutdown.time.sleep
returns. It calls the callback function that is registered with its job's Future
, which schedules another job. None
value from step 7 which is a signal that it may be done. It sees that the _shutdown
global is set and so it quits. Before it does though, it adds another None
to the queue.None
off the job queue instead of an actual job to complete. The first worker thread ends up doing all the actual work.None
that's at the head of the internal job queue gets picked up by the first worker (instead of an actual job). That means it shuts down, rather than doing more work._python_exit
function completes. This lets the interpreter shut down completely. We're done!This explains the output we see! We gets 8 outputs, all coming from the same worker thread (the first one spawned).
I think there may be a race condition however, in that code. If step 11 happens before step 10 things might break. If the first worker got a None
off the queue and the other newly spawned worker got the real job, the'd swap roles (the first worker would die, and the other one would do the rest of the work, barring more race conditions in the later versions of those steps). However, the main thread would be unblocked as soon as the first worker died. Since it doesn't know about the other threads (since they didn't exist when it made its list of the threads to wait on), it will close the interpreter prematurely.
I'm not sure if this race is ever likely to happen. I'd guess it's pretty unlikely, since the length of the code path between the new thread starting and it grabbing a job from the queue is much shorter than the path for the existing thread to finish the callback (the part after it queued up the new job) and then look for another job in the queue.
I suspect that it's a bug that the ThreadPoolExecutor
lets us exit cleanly when we run our code as a script. The logic for queuing up a new job should probably check the global _shutdown
flag in addition to the executor's own self._shutdown
attribute. If it did, trying to queue up another job after the main thread had finished would raise an exception.
You can replicate what I think would be saner behavior by creating the ThreadPoolExecutor
in a with
statement:
# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
Task(recursive(0)).step()
This will crash soon after the main thread returns from the step()
call. It will look something like this:
exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
callback(self)
File ".\task_coroutines.py", line 21, in _wakeup
self.step(result)
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 30, in recursive
Task(recursive(n+1)).step()
File ".\task_coroutines.py", line 14, in step
fut = self._gen.send(value)
File ".\task_coroutines.py", line 28, in recursive
yield pool.submit(time.sleep, 1)
File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With