Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Seemingly infinite recursion with generator based coroutines

The following is taken from David Beazley's slides on generators (here for anybody interested).

A Task class is defined which wraps a generator that yields futures, the Task class, in full (w/o error handling), follows:

class Task:
    def __init__(self, gen):
        self._gen = gen

    def step(self, value=None):
        try:
            fut = self._gen.send(value)
            fut.add_done_callback(self._wakeup)
        except StopIteration as exc:
            pass

    def _wakeup(self, fut):
        result = fut.result()
        self.step(result)

In an example, the following recursive function is also defined:

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=8)

def recursive(n):
   yield pool.submit(time.sleep, 0.001)
   print("Tick :", n)
   Task(recursive(n+1)).step()

The following two cases play out:

  1. From the Python REPL, if we define these (or import them if we place them in a file) and then jump-start the recursion with:

    Task(recursive(0)).step()
    

    it starts printing away seeming to a point where the recursion limit will have been exceeded. It obviously doesn't exceed it though, printing the stack level shows that it stays constant throughout execution. Something else is going on which I don't quite understand.

    NOTE: You'll need to kill the python process if you execute it like this.

  2. If we put all contents (Task, recursive) in a file along with:

    if __name__ == "__main__":
        Task(recursive(0)).step()
    

    and then run it with python myfile.py, it stops ticking at 7 (the number of max_workers, it seems).


My question is how does it seemingly surpass the recursion limit and why does it behave differently based on how you execute it?

The behavior appears on both Python 3.6.2 and Python 3.5.4 (and I'd guess others in the 3.6 and 3.5 family too).

like image 953
Dimitris Fasarakis Hilliard Avatar asked Oct 02 '17 16:10

Dimitris Fasarakis Hilliard


2 Answers

Lets start with what is number 7. That is the number of workers like you have mentioned already, labeled from [0..7]. The Task class needs to be passed recursive in form a function identifier.

Task(recursive).step(n) 

instead of

Task(recursive(n)).step()

It is because, the recursive function needs to be called inside the pool environment while in current case recursive is evaluated in main thread itself. time.sleep is the only function in current code which is evaluated in task pool.

A key facet where code has major issue is recursion. Each thread in the pool is dependent on inner function putting upper limit on execution to number of workers available. The function is not able to finish hence new one cannot execute. Thus, it terminates much before recursion limit is reached.

like image 182
Supreet Sethi Avatar answered Sep 22 '22 08:09

Supreet Sethi


The recursive generator you show isn't actually recursive in a way that would cause a problem with the system recursion limit.

To understand why you need to pay attention to when the recursive generator's code runs. Unlike a normal function, just calling recursive(0) doesn't cause it to immediately run its code and make additional recursive calls. Instead, calling recursive(0) immediately returns a generator object. Only when you send() to the generator does the code run, and only after you send() to it a second time does it kick off another call.

Let's examine the call stack as the code runs. At the top level, we run Task(recursive(0)).step(). That does three things in a sequence:

  1. recursive(0) This call returns a generator object immediately.
  2. Task(_) The Task object is created, and its __init__ method stores a reference to the generator object created in the first step.
  3. _.step() A method on the task gets called. This is where the action really starts! Let's look at what happens inside the call:

    • fut = self._gen.send(value) Here we actually start the generator running, by sending it a value. Let's go deeper and see the generator code run:
      • yield pool.submit(time.sleep, 0.001) This schedules something to be done in another thread. We don't wait for it to happen though. Instead, we get a Future that we can use to get notified when it's complete. We yield the future immediately back to the previous level of code.
    • fut.add_done_callback(self._wakeup) Here we ask for our _wakeup() method to be called when the future is ready. This always returns immediately!
    • The step method ends now. That's right, we're done (for the moment)! This is important for the second part of your question, which I'll discuss more later.
  4. The call we made ended, so control flow returns to the REPL if we're running interactively. If we're running as a script, the interpreter instead will reach the end of the scrip and start shutting down (I'll discuss this more below). However, the other threads controlled by the thread pool are still running, and at some point, one of them is going to do some stuff we care about! Let's see what that is.

  5. When the scheduled function (time.sleep) has finished running, the thread it was running in will call the callback we set on the Future object. That is, it will call Task._wakup() on the Task object we created earlier (which we don't have a reference to anymore at the top level, but the Future kept a reference so it's still alive). Let's look at the method:

    • result = fut.result() Store the result of the deferred call. This is irrelevant in this case since we never look at the results (it's None anyway).
    • self.step(result) Step again! Now we're back to the code we care about. Let's see what it does this time:
      • fut = self._gen.send(value) Send to the generator again, so it takes over. It already yielded once, so this time we start just after the yield:
        • print("Tick :", n) This is pretty simple.
        • Task(recursive(n+1)).step() This is where things get interesting. This line is just like what we started with. So, like before, this is going to run the logic 1-4 I listed above (including their substeps). But instead of returning to the REPL or ending the script, when the step() method returns, it comes back here.
        • The recursive() generator (the original one, not the new one we just created) has reached its end. So, like any generator that reaches the end of it's code, it raises StopIteration.
      • The StopIteration is caught and ignored by the try/except block, and the step() method ends.
    • The _wakup() method ends too, so the callback is done.
  6. Eventually the callback for the Task created in the earlier callback will be called as well. So we go back and repeat step 5, over and over, forever (if we're running interactively).

The call stack above explains why the interactive case prints forever. The main thread returns to the REPL (and you can do other stuff with it if you can see past the output from the other threads). But in the pool, each thread schedules another job from the callback of its own job. When the next job finishes, its callback schedules another job and so on.

So why do you get only 8 printouts when you run the code as a script? The answer is hinted at in step 4 above. When running non-interactively, the main thread runs off the end of the script after the first call to Task.step returns. This prompts the interpreter to try to shut down.

The concurrent.futures.thread module (where ThreadPoolExecutor is defined) has some fancy logic that tries to clean up nicely when the program shuts down while an executor is still active. It's supposed to stop any idle threads, and signal any that are still running to stop when their current job is complete.

The exact implementation of that cleanup logic interacts with our code in a very odd way (which may or may not buggy). The effect is that the first thread keeps giving itself more jobs to do, while additional worker threads that are spawned keep exiting immediately after they are spawned. The first worker finally quits when the executor has started as many threads as it wanted to be using (8 in our case).

Here's the sequence of events, as I understand it.

  1. We import (indirectly) the concurrent.futures.thread module, which uses atexit to tell the interpreter to run a function named _python_exit just before the interpreter shuts down.
  2. We create a ThreadPoolExecutor with a maximum thread count of 8. It doesn't spawn its worker threads immediately, but will create one each time a job is scheduled until it has all 8.
  3. We schedule our first job (in the deeply nested part of step 3 from the previous list).
  4. The executor adds the job to its internal queue, then notices it doesn't have the maximum number of worker threads and starts a new one.
  5. The new thread pops the job off the queue and begins running it. However, the sleep call takes much longer than the rest of the steps, so the thread is going to be stuck here for a bit.
  6. The main thread finishes (it's reached step 4 in the previous list).
  7. The _python_exit function gets called by the interpreter, since the interpreter wants to shut down. The function sets a global _shutdown variable in the module, and sends a None to the internal queue of the executor (it sends one None per thread, but there's just the one thread created so far, so it just sends one None). It then blocks the main thread until the thread it knows about has quit. This delays the interpreter shutdown.
  8. The worker thread's call to time.sleep returns. It calls the callback function that is registered with its job's Future, which schedules another job.
  9. Like in step 4 of this list, the executor queues up the job, and starts another thread, since it doesn't have the desired number yet.
  10. The new thread tries to grab a job off the internal queue, but gets the None value from step 7 which is a signal that it may be done. It sees that the _shutdown global is set and so it quits. Before it does though, it adds another None to the queue.
  11. The first worker thread finishes its callback. It looks for a new job, and finds the one it queued up itself in step 8. It starts running the job, and like in step 5, which takes a while.
  12. Nothing else happens though, since the first worker the only active thread at the moment (the main thread is blocked waiting on the first worker to die, and the other worker shut itself down).
  13. We now repeat steps 8-12 several times. The first worker thread queues up the third through 8th jobs, and the executor spawns a corresponding threads each time since it doesn't have a full set. However, each thread dies immediately, since it gets a None off the job queue instead of an actual job to complete. The first worker thread ends up doing all the actual work.
  14. Finally, after the 8th job, something works differently. This time, when the callback schedules another job, no additional thread is spawned, since the executor knows it's started the requested 8 threads already (it doesn't know that 7 have shut down).
  15. So this time, the None that's at the head of the internal job queue gets picked up by the first worker (instead of an actual job). That means it shuts down, rather than doing more work.
  16. When the first worker shuts down, the main thread (which has been waiting for it to quit) can finally unblock and the _python_exit function completes. This lets the interpreter shut down completely. We're done!

This explains the output we see! We gets 8 outputs, all coming from the same worker thread (the first one spawned).

I think there may be a race condition however, in that code. If step 11 happens before step 10 things might break. If the first worker got a None off the queue and the other newly spawned worker got the real job, the'd swap roles (the first worker would die, and the other one would do the rest of the work, barring more race conditions in the later versions of those steps). However, the main thread would be unblocked as soon as the first worker died. Since it doesn't know about the other threads (since they didn't exist when it made its list of the threads to wait on), it will close the interpreter prematurely.

I'm not sure if this race is ever likely to happen. I'd guess it's pretty unlikely, since the length of the code path between the new thread starting and it grabbing a job from the queue is much shorter than the path for the existing thread to finish the callback (the part after it queued up the new job) and then look for another job in the queue.

I suspect that it's a bug that the ThreadPoolExecutor lets us exit cleanly when we run our code as a script. The logic for queuing up a new job should probably check the global _shutdown flag in addition to the executor's own self._shutdown attribute. If it did, trying to queue up another job after the main thread had finished would raise an exception.

You can replicate what I think would be saner behavior by creating the ThreadPoolExecutor in a with statement:

# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
    Task(recursive(0)).step()

This will crash soon after the main thread returns from the step() call. It will look something like this:

exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
  File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
    callback(self)
  File ".\task_coroutines.py", line 21, in _wakeup
    self.step(result)
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 30, in recursive
    Task(recursive(n+1)).step()
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 28, in recursive
    yield pool.submit(time.sleep, 1)
  File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
like image 24
Blckknght Avatar answered Sep 24 '22 08:09

Blckknght