I have a function that generates a futures::Stream based on an argument. I want to call this function multiple times and flatten the streams together. Complicating matters is the fact that I want to feed the values returned by the stream back as the argument to the original function.
Concretely, I have a function that returns a stream of numbers down to zero:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
I want to call this function starting at 5. The function should also be called for every odd value that is returned. The total set of calls to numbers_down_to_zero would be:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
Producing the total stream of
4
3
2
1
0
2
1
0
0
0
What techniques exist to allow this?
You can solve this with unfold. You'd have a "state" struct that keeps both the "base stream" (in this case counting down towards zero) and the list of items that will produce a new stream, and using that as the argument for unfold to keep the state while unfolding.
This way the compiler doesn't have to reason about the lifetime ownership, as the state can be moved into the async block for each invocation of the closure.
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }
    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };
    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}
Full playground example
StreamExt::next requires that the inner stream implements Unpin, and thus it’s not usable with arbitrary streams. You can always use Box::pin(stream) instead, since Pin<Box<T>> is Unpin and implements Stream if T: Stream.
By (ab)using async / await, the genawaiter crate manages to mimic generator syntax in stable Rust today. Combined with futures::pin_mut to pin value on the stack, here is a solution both allocation-free and compatible with arbitrary streams:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}
Some drawbacks:
generator_mut macroWith one heap allocation, genawaiter::rc::Gen can get rid of all these. But again, with allocation on the table there are other options.
use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}
These are partial solutions that I have found, but are lacking for various reasons.
I don't like this solution because I think that interior mutability should not be required for this general problem, but it is required here because the borrow checker doesn't know how the calls to the closures will interleave.
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
use std::{cell::RefCell, rc::Rc};
fn y0() -> impl Stream<Item = i32> {
    let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
    let to_visit_b = to_visit.clone();
    stream::unfold(to_visit, |to_visit| async {
        let i = to_visit.borrow_mut().pop_back()?;
        Some((x(i), to_visit))
    })
    .flatten()
    .inspect(move |&x| {
        if x % 2 != 0 {
            to_visit_b.borrow_mut().push_front(x);
        }
    })
}
#[tokio::main]
async fn main() {
    y0().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}
playground
Stream::poll_next
I don't like this solution because it's verbose and requires tricky unsafe code that is hard to reason about (I'm not even sure what I have is correct!)
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
use std::{
    pin::Pin,
    task::{Context, Poll},
};
struct X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    state: St,
    create: C,
    review: R,
    current: Option<S>,
}
impl<St, C, R, S> Stream for X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    type Item = S::Item;
    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (state, create, review, current) = unsafe {
            let Self {
                state,
                create,
                review,
                current,
            } = self.get_unchecked_mut();
            (state, create, review, current)
        };
        loop {
            if let Some(current) = current {
                let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
                if let Some(mut v) = v {
                    review(state, &mut v);
                    return Poll::Ready(Some(v));
                }
            }
            *current = create(state);
            if current.is_none() {
                return Poll::Ready(None);
            }
        }
    }
}
fn y1() -> impl Stream<Item = i32> {
    X {
        state: VecDeque::from(vec![5]),
        create: |to_visit| {
            let i = to_visit.pop_back()?;
            Some(x(i))
        },
        review: |to_visit, &mut x| {
            if x % 2 != 0 {
                to_visit.push_front(x);
            }
        },
        current: None,
    }
}
#[tokio::main]
async fn main() {
    y1().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}
playground
This doesn't work because the sender is never dropped because the receiver is never dropped because the sender is never dropped ...
Besides not working, this has a number of downsides:
async itself to push the initial value to visit. Sender inside of the then closure.use futures::{stream, Stream, StreamExt};
fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}
use futures::channel::mpsc;
use futures::sink::SinkExt;
async fn y2() -> impl Stream<Item = i32> {
    let (mut tx, rx) = mpsc::unbounded();
    tx.send(5).await.unwrap();
    rx.map(x).flatten().then(move |x| {
        let mut tx = tx.clone();
        async move {
            if x % 2 != 0 {
                tx.send(x).await.unwrap();
            }
            x
        }
    })
}
#[tokio::main]
async fn main() {
    y2().await
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}
playground
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With