After half a year of Rust coding, today I am for the first time truly baffled by a compiler error. Here is the condensed version of a library concept I was working on:
use futures::{Stream, StreamExt, pin_mut, stream::Next};
pub trait Stream2<T>: Stream<Item = T> {
fn next2(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
self.next()
}
}
pub struct WrappedStream<S>(S);
//impl<S: Unpin> Unpin for WrappedStream<S> {}
impl<T, S> Stream for WrappedStream<S>
where
S: Stream<Item = T>,
{
type Item = T;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// The implementation here is kind of irrelevant since our issue is at compile time
std::task::Poll::Ready(None)
}
}
impl<T, S> Stream2<T> for WrappedStream<S> where S: Stream<Item = T> {}
#[tokio::main]
async fn main() {
let stream = WrappedStream(futures::stream::once(async move { 1 }));
pin_mut!(stream);
// This compiles
while let Some(item) = stream.next().await {}
let stream = WrappedStream(futures::stream::once(async move { 1 }));
pin_mut!(stream);
// This doesn't compile??
while let Some(item) = stream.next2().await {}
playground link
I am highly confused by the fact that StreamExt::next
and Stream2::next2
have seemingly the exact same signature and yet they behave differently. I played around with this a lot, one observation is that we can pin the inner stream before assigning it to the WrappedStream
newtype, however in practice this will make the API less generic. I am really curious to fully understand the underlying issue here.
I figured out a fix, but I don't know how it works. I noticed that Stream
has some extra implementations for DerefMut + Unpin
(e.g. Pin
, Box
as I understand).
Adding an analogous impl lets this compile:
impl<T, P> Stream2<T> for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Stream2<T>,
{
}
//impl<S, T> Stream2<T> for &mut S where S: Stream2<T> + Unpin + ?Sized {}
playground link
Now my question is really, how does this magic work? I assume there is some auto-resolution of the next()
call for Pin<&mut Self>
? I would really like to understand exactly how these two calls are resolved.
When you call stream.next2()
the compiler doesn't find a Stream2<_>
implementation for Pin<&mut WrappedStream<_>>
, which is what stream
is at that point. However, Pin
implements DerefMut
, so deref coercion kicks in: the compiler tries to use that to get a &mut WrappedStream
to use the impl<T, S> Stream2<T> for WrappedStream<S>
implementation, and that (obviously) fails since the T
in WrappedStream<T>
is not Unpin
in this case.
You need to provide an implementation for Pin
as well, which can be a blanket one:
impl<T, S> Stream2<T> for Pin<S>
where
S: DerefMut + Unpin,
S::Target: Stream2<T>,
{
}
Note that futures::stream::Stream
has such an implementation, which is why stream.next()
does not cause an error in your code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With