Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process a vector as an asynchronous stream?

In my RSS reader project, I want to read my RSS feeds asynchronously. Currently, they're read synchronously thanks to this code block

self.feeds = self
    .feeds
    .iter()
    .map(|f| f.read(&self.settings))
    .collect::<Vec<Feed>>();

I want to make that code asynchronous, because it will allow me to better handle poor web server responses.

I understand I can use a Stream that I can create from my Vec using stream::from_iter(...) which transforms the code into something like

self.feeds = stream::from_iter(self.feeds.iter())
    .map(|f| f.read(&self.settings))
    // ???
    .collect::<Vec<Feed>>()
}

But then, I have two questions

  1. How to have results joined into a Vec (which is a synchronous struct)?
  2. How to execute that stream? I was thinking about using task::spawn but it doesn't seems to work ...
like image 954
Riduidel Avatar asked Dec 08 '19 17:12

Riduidel


1 Answers

How to execute that stream? I was thinking about using task::spawn but it doesn't seems to work

In the async/await world, asynchronous code is meant to be executed by an executor, which is not part of the standard library but provided by third-party crates such as tokio. task::spawn only schedules one instance of async fn to run, not actually running it.

How to have results joined into a vec (which is a sync struct)

The bread and butter of your rss reader seems to be f.read. It should be turned into an asynchronous function. Then the vector of feeds will be mapped into a vector of futures, which need to be polled to completion.

The futures crate has futures::stream::futures_unordered::FuturesUnordered to help you do that. FuturesUnordered itself implements Stream trait. This stream is then collected into the result vector and awaited to completion like so:

//# tokio = { version = "0.2.4", features = ["full"] }
//# futures = "0.3.1"
use tokio::time::delay_for;
use futures::stream::StreamExt;
use futures::stream::futures_unordered::FuturesUnordered;
use std::error::Error;
use std::time::{Duration, Instant};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let start = Instant::now();
    let feeds = (0..10).collect::<Vec<_>>();
    let res = read_feeds(feeds).await;
    dbg!(res);
    dbg!(start.elapsed());

    Ok(())
}

async fn read_feeds(feeds: Vec<u32>) -> Vec<u32> {
    feeds.iter()
        .map(read_feed)
        .collect::<FuturesUnordered<_>>()
        .collect::<Vec<_>>()
        .await
}

async fn read_feed(feed: &u32) -> u32 {
    delay_for(Duration::from_millis(500)).await;

    feed * 2
}

delay_for is to simulate the potentially expensive operation. It also helps to demonstrate that these readings indeed happen concurrently without any explicit thread related logic.

One nuance here. Unlike its synchronous counterpart, the results of reading rss feeds aren't in the same order of feeds themselves any more, whichever returns the first will be at the front. You need to deal with that somehow.

like image 51
edwardw Avatar answered Nov 03 '22 09:11

edwardw