Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cheaply send a delay message?

My requirement is very simple, which is a very reasonable requirement in many programs. It is to send a specified message to my Channel after a specified time.

I've checked tokio for topics related to delay, interval or timeout, but none of them seem that straightforward to implement.

What I've come up with now is to spawn an asynchronous task, then wait or sleep for a certain amount of time, and finally send the message.

But, obviously, spawning an asynchronous task is a relatively heavy operation. Is there a better solution?

async fn my_handler(sender: mpsc::Sender<i32>, dur: Duration) {
    tokio::spawn(async {
        time::sleep(dur).await;
        sender.send(0).await;
    }
}
like image 568
progquester Avatar asked Dec 19 '25 16:12

progquester


1 Answers

You could try adding a second channel and a continuously running task that buffers messages until the time they are to be received. Implementing this is more involved than it sounds, I hope I'm handling cancellations right here:

fn make_timed_channel<T: Ord + Send + Sync + 'static>() -> (Sender<(Instant, T)>, Receiver<T>) {
    // Ord is an unnecessary requirement arising from me stuffing both the Instant and the T into the Binary heap
    // You could drop this requirement by using the priority_queue crate instead

    let (sender1, receiver1) = mpsc::channel::<(Instant, T)>(42);
    let (sender2, receiver2) = mpsc::channel::<T>(42);
    let mut receiver1 = Some(receiver1);
    tokio::spawn(async move {
        let mut buf = std::collections::BinaryHeap::<Reverse<(Instant, T)>>::new();
        loop {
            // Pretend we're a bounded channel or exit if the upstream closed
            if buf.len() >= 42 || receiver1.is_none() {
                match buf.pop() {
                    Some(Reverse((time, element))) => {
                        sleep_until(time).await;
                        if sender2.send(element).await.is_err() {
                            break;
                        }
                    }
                    None => break,
                }
            }
            // We have some deadline to send a message at
            else if let Some(Reverse((then, _))) = buf.peek() {
                if let Ok(recv) = timeout_at(*then, receiver1.as_mut().unwrap().recv()).await {
                    match recv {
                        Some(recv) => buf.push(Reverse(recv)),
                        None => receiver1 = None,
                    }
                } else {
                    if sender2.send(buf.pop().unwrap().0 .1).await.is_err() {
                        break;
                    }
                }
            }
            // We're empty, wait around
            else {
                match receiver1.as_mut().unwrap().recv().await {
                    Some(recv) => buf.push(Reverse(recv)),
                    None => receiver1 = None,
                }
            }
        }
    });
    (sender1, receiver2)
}

Playground

Whether this is more efficient than spawning tasks, you'd have to benchmark. (I doubt it. Tokio iirc has some much fancier solution than a BinaryHeap for waiting for waking up at the next timeout, e.g.)

One optimization you could make if you don't need a Receiver<T> but just something that .poll().await can be called on: You could drop the second channel and maintain the BinaryHeap inside a custom receiver.

like image 56
Caesar Avatar answered Dec 21 '25 17:12

Caesar