Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a Future or Stream that polls an async fn?

I have a struct Test I want to implement std::future::Future that would poll function:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct Test;

impl Test {
    async fn function(&mut self) {}
}

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

That didn't work:

error[E0308]: mismatched types
  --> src/lib.rs:17:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
17 |             Poll::Pending => Poll::Pending,
   |             ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

error[E0308]: mismatched types
  --> src/lib.rs:18:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
18 |             Poll::Ready(_) => Poll::Ready(()),
   |             ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

I understand that function must be called once, the returned Future must be stored somewhere in the struct, and then the saved future must be polled. I tried this:

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);

impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

That also didn't work:

error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
   --> src/lib.rs:7:13
    |
7   | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
    |
    = help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`

After I call function() I have taken a &mut reference of Test, because of that I can't change the Test variable, and therefore can't store the returned Future inside the Test.

I did get an unsafe solution (inspired by this)

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

I hope that there is another way.

like image 357
Rijenkii Avatar asked Apr 18 '20 19:04

Rijenkii


2 Answers

Though there are times when you may want to do things similar to what you're trying to accomplish here, they are a rarity. So most people reading this, maybe even OP, may wish to restructure such that struct state and data used for a single async execution are different objects.

To answer your question, yes it is somewhat possible. Unless you want to absolutely resort to unsafe code you will need to use Mutex and Arc. All fields you wish to manipulate inside the async fn will have to be wrapped inside a Mutex and the function itself will accept an Arc<Self>.

I must stress, however, that this is not a beautiful solution and you probably don't want to do this. Depending on your specific case your solution may vary, but my guess of what OP is trying to accomplish while using Streams would be better solved by something similar to this gist that I wrote.

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
};

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}

impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}

impl Future for Test {
    type Output = ();
    fn poll(
        self: std::pin::Pin<&mut Self>,
        ctx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}
like image 133
Tim Lundqvist Avatar answered Oct 22 '22 17:10

Tim Lundqvist


I'm not sure what you want to achieve and why, but I suspect that you're trying to implement Future for Test based on some ancient tutorial or misunderstanding and just overcomplicating things.

You don't have to implement Future manually. An async function

async fn function(...) {...}

is really just syntax sugar translated behind the scenes into something like

fn function(...) -> Future<()> {...}

All you have to do is to use the result of the function the same way as any future, e.g. use await on it or call block a reactor until it's finished. E.g. based on your first version, you can simply call:

let mut test = Test{};
test.function().await;

UPDATE1

Based on your descriptions I still think you're trying to overcomplicate this minimal working snippet without the need to manually implement Future for anything:

async fn asyncio() { println!("Doing async IO"); }

struct Test {
    count: u32,
}

impl Test {
    async fn function(&mut self) {
        asyncio().await;
        self.count += 1;
    }
}

#[tokio::main]
async fn main() {
    let mut test = Test{count: 0};
    test.function().await;
    println!("Count: {}", test.count);
}
like image 36
Zólyomi István Avatar answered Oct 22 '22 16:10

Zólyomi István