In my RSS reader project, I want to read my RSS feeds asynchronously. Currently, they're read synchronously thanks to this code block
self.feeds = self
.feeds
.iter()
.map(|f| f.read(&self.settings))
.collect::<Vec<Feed>>();
I want to make that code asynchronous, because it will allow me to better handle poor web server responses.
I understand I can use a Stream
that I can create from my Vec
using stream::from_iter(...)
which transforms the code into something like
self.feeds = stream::from_iter(self.feeds.iter())
.map(|f| f.read(&self.settings))
// ???
.collect::<Vec<Feed>>()
}
But then, I have two questions
Vec
(which is a synchronous struct)?task::spawn
but it doesn't seems to work ...How to execute that stream? I was thinking about using
task::spawn
but it doesn't seems to work
In the async/await
world, asynchronous code is meant to be executed by an executor, which is not part of the standard library but provided by third-party crates such as tokio
. task::spawn
only schedules one instance of async fn
to run, not actually running it.
How to have results joined into a vec (which is a sync struct)
The bread and butter of your rss reader seems to be f.read
. It should be turned into an asynchronous function. Then the vector of feeds will be mapped into a vector of futures, which need to be polled to completion.
The futures
crate has futures::stream::futures_unordered::FuturesUnordered
to help you do that. FuturesUnordered
itself implements Stream
trait. This stream is then collected into the result vector and await
ed to completion like so:
//# tokio = { version = "0.2.4", features = ["full"] }
//# futures = "0.3.1"
use tokio::time::delay_for;
use futures::stream::StreamExt;
use futures::stream::futures_unordered::FuturesUnordered;
use std::error::Error;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now();
let feeds = (0..10).collect::<Vec<_>>();
let res = read_feeds(feeds).await;
dbg!(res);
dbg!(start.elapsed());
Ok(())
}
async fn read_feeds(feeds: Vec<u32>) -> Vec<u32> {
feeds.iter()
.map(read_feed)
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
}
async fn read_feed(feed: &u32) -> u32 {
delay_for(Duration::from_millis(500)).await;
feed * 2
}
delay_for
is to simulate the potentially expensive operation. It also helps to demonstrate that these readings indeed happen concurrently without any explicit thread related logic.
One nuance here. Unlike its synchronous counterpart, the results of reading rss feeds aren't in the same order of feeds themselves any more, whichever returns the first will be at the front. You need to deal with that somehow.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With