Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is `Future::poll` not called repeatedly after returning `NotReady`?

Consider the following code

extern crate futures; // v0.1 (old)

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

The spawned thread sets a flag, which the future waits for. We also sleep the spawned thread, so the initial .poll() call from .wait() is before the flag is set. This causes .wait() to block (seemingly) indefinitely. If we uncomment the other thread::sleep_ms, .wait() returns, and prints out the result (()).

I would expect the current thread to try to resolve the future by calling poll multiple times, since we're blocking the current thread. However, this is not happening.

I have tried to read some docs, and it seems like the problem is that the thread is parked after getting NotReady from the poll the first time. However, it is not clear to me why this is, or how it is possible to work around this.

What am I missing?

like image 727
MartinHaTh Avatar asked Apr 19 '17 18:04

MartinHaTh


1 Answers

Why would you need to park a waiting future instead of polling it repeatedly? The answer is rather obvious, IMHO. Because at the end of the day it's faster and more efficient!

To repeatedly poll a future (which might be dubbed "busy-waiting") the library would have to decide whether to do it often or seldom and neither answer is satisfactory. Do it often and you're wasting the CPU cycles, do it seldom and the code is slow to react.

So yeah, you need to park the task when you're waiting for something and then unpark it when you've done waiting. Like this:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}

Note that Future::poll is doing several things here: it's checking for an external condition and it's parking the task, so it's possible to have a race, like when:

  1. the poll checks the variable and finds it to be false;
  2. the outer code sets the variable to true;
  3. the outer code checks if the task is parked and finds that it's not;
  4. the poll parks the task, but boom! it is too late, nobody is going to unpark it any longer.

In order to avoid any races, I've used a Mutex to synchronize these interactions.

P.S. If all you need is to wrap a thread result into a Future then consider using the oneshot channel: it has the Receiver that implements the Future interface already.

like image 123
ArtemGr Avatar answered Nov 11 '22 10:11

ArtemGr