Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I spawn many cancellable timers using Tokio?

How do I use Tokio to implement a fixed number of timers that are regularly reset and canceled across threads? When a timer expires, a callback will be executed.

An API similar to that of Go's time.AfterFunc is essentially what I desire:

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.AfterFunc(time.Hour, func() {
        // happens every 2 seconds with 1 second delay
        fmt.Println("fired")
    })

    for {
        t.Reset(time.Second)
        time.Sleep(time.Second * 2)
    }
}

The only crate I've found that implements a (sufficiently) similar API is timer and it does so in a very naive fashion, by spawning 2 threads. This quickly becomes prohibitive when the timers are reset often.

The obvious answer is to use Tokio, the question is how to do this elegantly.

One option is to spawn a new green thread every time a timer is updated and cancel the previous timer using an atomic, by conditioning the execution of the callback on this atomic, such as this pseudo-Rust:

tokio::run({
    // for every timer spawn with a new "cancel" atomic
    tokio::spawn({
        Delay::new(Instant::now() + Duration::from_millis(1000))
            .map_err(|e| panic!("timer failed; err={:?}", e))
            .and_then(|_| {
                if !cancelled.load(Ordering::Acquire) {
                    println!("fired");
                }
                Ok(())
            })
    })
})

The problem is that I maintain state for timers which are already canceled, potentially for minutes. In addition, it does not seem elegant.

Besides tokio::time::Delay, tokio::time::DelayQueue also seemed applicable. In particular, the ability to reset and cancel timers by referencing them with the Key returned from "insert".

It is unclear how to use this library in a multi-threaded application, namely:

The return value represents the insertion and is used at an argument to remove and reset. Note that Key is token and is reused once value is removed from the queue either by calling poll after when is reached or by calling remove. At this point, the caller must take care to not use the returned Key again as it may reference a different item in the queue.

Which would create a race-condition between a task canceling the timer by its key and the task consuming timer events from the DelayQueue stream — resulting in a panic or canceling of an unrelated timer.

like image 465
Cshark Avatar asked Oct 18 '25 06:10

Cshark


1 Answers

You can use the Select combinator from futures-rs with Tokio. It returns the result of the first completed future then ignores/stops polling the other one.

As a second future, we can use a receiver from oneshot::channel to create a signal to finish our combinator future.

use futures::sync::oneshot;
use futures::*;
use std::thread;
use std::time::{Duration, Instant};
use tokio::timer::Delay;

fn main() {
    let (interrupter, interrupt_handler) = oneshot::channel::<()>();

    //signal to cancel delayed call
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500)); //increase this value more than 1000ms to see is delayed call is working or not.
        interrupter
            .send(())
            .expect("Not able to cancel delayed future");
    });

    let delayed = Delay::new(Instant::now() + Duration::from_millis(1000))
        .map_err(|e| panic!("timer failed; err={:?}", e))
        .and_then(|_| {
            println!("Delayed Call Executed!");

            Ok(())
        });

    tokio::run(delayed.select(interrupt_handler).then(|_| Ok(())));
}

Playground

like image 94
Ömer Erden Avatar answered Oct 20 '25 09:10

Ömer Erden



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!