Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel an infinite stream from within the stream itself?

I'm trying to cancel an interval (interval_timer) after emptying a queue but not sure what is the right strategy.

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

I tried drop as suggested in gitter but that ended up with an error:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    if some_vars.len() == 1 {
        drop(interval_timer);
    }

    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

The error:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
   |
60 |     let mut interval_timer = tokio_timer::Timer::default();
   |         ------------------ captured outer variable
...
72 |                 drop(interval_timer);
   |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
like image 580
opensourcegeek Avatar asked Mar 12 '18 12:03

opensourcegeek


1 Answers

For cases where you want to cancel a stream from outside of the stream, see stream-cancel.


For your specific case, it's easiest to convert your collection into a stream and zip it together with the interval timer. This way, the resulting stream naturally stops when the collection is empty:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let some_vars = stream::iter_ok(some_vars.into_iter().rev());
        let combined = timer.zip(some_vars);

        combined.for_each(move |(_, item)| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

Otherwise, you can stop the stream by using and_then to both remove the value from the collection and control if the stream should continue:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let limited = timer.and_then(move |_| {
            if some_vars.len() <= 4 {
                Err(())
            } else {
                some_vars.pop().ok_or(())
            }
        });

        limited.for_each(move |item| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}
like image 113
Shepmaster Avatar answered Nov 10 '22 21:11

Shepmaster