I have a function that generates a futures::Stream
based on an argument. I want to call this function multiple times and flatten the streams together. Complicating matters is the fact that I want to feed the values returned by the stream back as the argument to the original function.
Concretely, I have a function that returns a stream of numbers down to zero:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
I want to call this function starting at 5. The function should also be called for every odd value that is returned. The total set of calls to numbers_down_to_zero
would be:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
Producing the total stream of
4
3
2
1
0
2
1
0
0
0
What techniques exist to allow this?
You can solve this with unfold
. You'd have a "state" struct that keeps both the "base stream" (in this case counting down towards zero) and the list of items that will produce a new stream, and using that as the argument for unfold
to keep the state while unfolding.
This way the compiler doesn't have to reason about the lifetime ownership, as the state can be moved into the async
block for each invocation of the closure.
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}
Full playground example
StreamExt::next
requires that the inner stream implements Unpin
, and thus it’s not usable with arbitrary streams. You can always use Box::pin(stream)
instead, since Pin<Box<T>>
is Unpin
and implements Stream
if T: Stream
.
By (ab)using async / await, the genawaiter
crate manages to mimic generator syntax in stable Rust today. Combined with futures::pin_mut
to pin value on the stack, here is a solution both allocation-free and compatible with arbitrary streams:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
pin_mut,
stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
}
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
generator_mut!(stream, |co| g(5, co));
stream
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
Some drawbacks:
generator_mut
macroWith one heap allocation, genawaiter::rc::Gen
can get rid of all these. But again, with allocation on the table there are other options.
use futures::{
pin_mut,
stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
Gen::new(|co| async move {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
})
}
These are partial solutions that I have found, but are lacking for various reasons.
I don't like this solution because I think that interior mutability should not be required for this general problem, but it is required here because the borrow checker doesn't know how the calls to the closures will interleave.
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{cell::RefCell, rc::Rc};
fn y0() -> impl Stream<Item = i32> {
let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
let to_visit_b = to_visit.clone();
stream::unfold(to_visit, |to_visit| async {
let i = to_visit.borrow_mut().pop_back()?;
Some((x(i), to_visit))
})
.flatten()
.inspect(move |&x| {
if x % 2 != 0 {
to_visit_b.borrow_mut().push_front(x);
}
})
}
#[tokio::main]
async fn main() {
y0().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
playground
Stream::poll_next
I don't like this solution because it's verbose and requires tricky unsafe
code that is hard to reason about (I'm not even sure what I have is correct!)
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{
pin::Pin,
task::{Context, Poll},
};
struct X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
state: St,
create: C,
review: R,
current: Option<S>,
}
impl<St, C, R, S> Stream for X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (state, create, review, current) = unsafe {
let Self {
state,
create,
review,
current,
} = self.get_unchecked_mut();
(state, create, review, current)
};
loop {
if let Some(current) = current {
let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
if let Some(mut v) = v {
review(state, &mut v);
return Poll::Ready(Some(v));
}
}
*current = create(state);
if current.is_none() {
return Poll::Ready(None);
}
}
}
}
fn y1() -> impl Stream<Item = i32> {
X {
state: VecDeque::from(vec![5]),
create: |to_visit| {
let i = to_visit.pop_back()?;
Some(x(i))
},
review: |to_visit, &mut x| {
if x % 2 != 0 {
to_visit.push_front(x);
}
},
current: None,
}
}
#[tokio::main]
async fn main() {
y1().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
playground
This doesn't work because the sender is never dropped because the receiver is never dropped because the sender is never dropped ...
Besides not working, this has a number of downsides:
async
itself to push the initial value to visit. Sender
inside of the then
closure.use futures::{stream, Stream, StreamExt};
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use futures::channel::mpsc;
use futures::sink::SinkExt;
async fn y2() -> impl Stream<Item = i32> {
let (mut tx, rx) = mpsc::unbounded();
tx.send(5).await.unwrap();
rx.map(x).flatten().then(move |x| {
let mut tx = tx.clone();
async move {
if x % 2 != 0 {
tx.send(x).await.unwrap();
}
x
}
})
}
#[tokio::main]
async fn main() {
y2().await
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
playground
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With