Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create a stream where the items are based on items that the stream previously returned?

I have a function that generates a futures::Stream based on an argument. I want to call this function multiple times and flatten the streams together. Complicating matters is the fact that I want to feed the values returned by the stream back as the argument to the original function.

Concretely, I have a function that returns a stream of numbers down to zero:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

I want to call this function starting at 5. The function should also be called for every odd value that is returned. The total set of calls to numbers_down_to_zero would be:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

Producing the total stream of

4
3
2
1
0
2
1
0
0
0

What techniques exist to allow this?

like image 276
Shepmaster Avatar asked Jan 11 '20 19:01

Shepmaster


3 Answers

You can solve this with unfold. You'd have a "state" struct that keeps both the "base stream" (in this case counting down towards zero) and the list of items that will produce a new stream, and using that as the argument for unfold to keep the state while unfolding.

This way the compiler doesn't have to reason about the lifetime ownership, as the state can be moved into the async block for each invocation of the closure.

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }

    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };

    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}

Full playground example

StreamExt::next requires that the inner stream implements Unpin, and thus it’s not usable with arbitrary streams. You can always use Box::pin(stream) instead, since Pin<Box<T>> is Unpin and implements Stream if T: Stream.

like image 169
Frxstrem Avatar answered Nov 15 '22 09:11

Frxstrem


By (ab)using async / await, the genawaiter crate manages to mimic generator syntax in stable Rust today. Combined with futures::pin_mut to pin value on the stack, here is a solution both allocation-free and compatible with arbitrary streams:

//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
    pin_mut,
    stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;

async fn g(n: i32, co: Co<'_, i32>) {
    let mut seeds = VecDeque::from(vec![n]);
    while let Some(seed) = seeds.pop_back() {
        let stream = f(seed);
        pin_mut!(stream);
        while let Some(x) = stream.next().await {
            if x % 2 != 0 {
                seeds.push_front(x);
            }
            co.yield_(x).await;
        }
    }
}

fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

#[tokio::main]
async fn main() {
    generator_mut!(stream, |co| g(5, co));
    stream
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

Some drawbacks:

  • there is one unsafe call inside generator_mut macro
  • the interface is a bit leaky. The callers get to see some implementation details.

With one heap allocation, genawaiter::rc::Gen can get rid of all these. But again, with allocation on the table there are other options.

use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;

fn g(n: i32) -> impl Stream<Item = i32> {
    Gen::new(|co| async move {
        let mut seeds = VecDeque::from(vec![n]);
        while let Some(seed) = seeds.pop_back() {
            let stream = f(seed);
            pin_mut!(stream);
            while let Some(x) = stream.next().await {
                if x % 2 != 0 {
                    seeds.push_front(x);
                }
                co.yield_(x).await;
            }
        }
    })
}
like image 23
edwardw Avatar answered Nov 15 '22 08:11

edwardw


These are partial solutions that I have found, but are lacking for various reasons.

Using combinators with interior mutability

I don't like this solution because I think that interior mutability should not be required for this general problem, but it is required here because the borrow checker doesn't know how the calls to the closures will interleave.

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{cell::RefCell, rc::Rc};

fn y0() -> impl Stream<Item = i32> {
    let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
    let to_visit_b = to_visit.clone();

    stream::unfold(to_visit, |to_visit| async {
        let i = to_visit.borrow_mut().pop_back()?;

        Some((x(i), to_visit))
    })
    .flatten()
    .inspect(move |&x| {
        if x % 2 != 0 {
            to_visit_b.borrow_mut().push_front(x);
        }
    })
}

#[tokio::main]
async fn main() {
    y0().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground

Custom implementation of Stream::poll_next

I don't like this solution because it's verbose and requires tricky unsafe code that is hard to reason about (I'm not even sure what I have is correct!)

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{
    pin::Pin,
    task::{Context, Poll},
};

struct X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    state: St,
    create: C,
    review: R,
    current: Option<S>,
}

impl<St, C, R, S> Stream for X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (state, create, review, current) = unsafe {
            let Self {
                state,
                create,
                review,
                current,
            } = self.get_unchecked_mut();
            (state, create, review, current)
        };

        loop {
            if let Some(current) = current {
                let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
                if let Some(mut v) = v {
                    review(state, &mut v);
                    return Poll::Ready(Some(v));
                }
            }

            *current = create(state);
            if current.is_none() {
                return Poll::Ready(None);
            }
        }
    }
}

fn y1() -> impl Stream<Item = i32> {
    X {
        state: VecDeque::from(vec![5]),
        create: |to_visit| {
            let i = to_visit.pop_back()?;

            Some(x(i))
        },
        review: |to_visit, &mut x| {
            if x % 2 != 0 {
                to_visit.push_front(x);
            }
        },
        current: None,
    }
}

#[tokio::main]
async fn main() {
    y1().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground


Using channels (non-working)

This doesn't work because the sender is never dropped because the receiver is never dropped because the sender is never dropped ...

Besides not working, this has a number of downsides:

  • The state has to implicitly be a queue (which matches what I want to do, but isn't very general).
  • It requires that my function become async itself to push the initial value to visit.
  • I have to handle error conditions that seem irrelevant.
  • I have to clone the Sender inside of the then closure.
use futures::{stream, Stream, StreamExt};

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use futures::channel::mpsc;
use futures::sink::SinkExt;

async fn y2() -> impl Stream<Item = i32> {
    let (mut tx, rx) = mpsc::unbounded();

    tx.send(5).await.unwrap();

    rx.map(x).flatten().then(move |x| {
        let mut tx = tx.clone();
        async move {
            if x % 2 != 0 {
                tx.send(x).await.unwrap();
            }
            x
        }
    })
}

#[tokio::main]
async fn main() {
    y2().await
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

playground

like image 29
Shepmaster Avatar answered Nov 15 '22 07:11

Shepmaster