Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Manually polling streams in future implementation

I'm in the process of migrating to futures 0.3 and tokio 0.2, and there is one recurring pattern I can't manage to re-use. I'm not sure whether this pattern became obsolete or whether I'm doing something wrong wrt to Pin.

Usually I have one type that holds a socket and a few channel receivers. The Future implementation for such structs consists in polling the streams repeatedly until they return Pending (NotReady in the 0.1 ecosystem).

However, in futures 0.3, Future::poll and Stream::poll_next take self instead of &mut self, and this pattern does not work anymore:

use futures::{
    stream::Stream,
    task::{Context, Poll},
    Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};

/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;

impl State {
    fn update(&mut self, _data: Vec<u8>) {
        println!("updated state");
    }
    fn handle_event(&mut self, _event: u32) {
        println!("handled event");
    }
}

/// The future I want to implement.
struct MyFuture {
    state: State,
    data: Receiver<Vec<u8>>,
    events: Receiver<Vec<u8>>,
}

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        loop {
            // this breaks, because Pin::new consume the mutable
            // reference on the first iteration of the loop.
            match Pin::new(data).poll_next(cx) {
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }

    // unimplemented, but we basically have the same problem than with
    // `poll_data()`
    fn poll_events(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        unimplemented!()
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        // This does not work because self was consumed when
        // self.poll_data() was called.
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

Is there a way to fix that code? If not, what pattern could I use to implement the same logic?

like image 476
little-dude Avatar asked Mar 06 '26 01:03

little-dude


1 Answers

You can use Pin::as_mut to avoid consuming the Pin.

impl MyFuture {
    fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        let MyFuture {
            ref mut data,
            ref mut state,
            ..
        } = self.get_mut();

        let mut data = Pin::new(data); // Move pin here
        loop {
            match data.as_mut().poll_next(cx) {   // Use in loop by calling `as_mut()`
                Ready(Some(vec)) => state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

and in Future impl:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        // `as_mut()` here to avoid consuming
        if let Ready(_) = self.as_mut().poll_data(cx) { 
            return Ready(());
        }

        // can consume here as this is the last invocation
        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}

EDIT:

Tip: Try to use Pin only when necessary. In your case, you don't really need Pinned pointer in poll_data function. &mut self is just fine, which reduces Pin usage a little:

impl MyFuture {
    fn poll_data(&mut self, cx: &mut Context) -> Poll<()> {
        use Poll::*;

        loop {
            match Pin::new(&mut self.data).poll_next(cx) {
                Ready(Some(vec)) => self.state.update(vec),
                Ready(None) => return Ready(()),
                Pending => return Pending,
            }
        }
    }
}

and Future impl:

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        use Poll::*;
        if let Ready(_) = self.poll_data(cx) {
            return Ready(());
        }

        if let Ready(_) = self.poll_events(cx) {
            return Ready(());
        }
        return Pending;
    }
}
like image 103
Gurwinder Singh Avatar answered Mar 07 '26 21:03

Gurwinder Singh