The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.
How could this be done?
use http::{Request, Response}; let mut request = Request::builder() . uri("https://www.rust-lang.org/") . header("User-Agent", "my-awesome-agent/1.0"); if needs_awesome_header() { request = request. header("Awesome", "yes"); } let response = send(request.
As of reqwest 0.10:
use futures::{stream, StreamExt}; // 0.3.5 use reqwest::Client; // 0.10.6 use tokio; // 0.2.21, features = ["macros"] const CONCURRENT_REQUESTS: usize = 2; #[tokio::main] async fn main() { let client = Client::new(); let urls = vec!["https://api.ipify.org"; 2]; let bodies = stream::iter(urls) .map(|url| { let client = &client; async move { let resp = client.get(url).send().await?; resp.bytes().await } }) .buffer_unordered(CONCURRENT_REQUESTS); bodies .for_each(|b| async { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } }) .await; }
stream::iter(urls)
stream::iter
Take a collection of strings and convert it into a Stream
.
.map(|url| {
StreamExt::map
Run an asynchronous function on every element in the stream and transform the element to a new type.
let client = &client; async move {
Take an explicit reference to the Client
and move the reference (not the original Client
) into an anonymous asynchronous block.
let resp = client.get(url).send().await?;
Start an asynchronous GET request using the Client
's connection pool and wait for the request.
resp.bytes().await
Request and wait for the bytes of the response.
.buffer_unordered(N);
StreamExt::buffer_unordered
Convert a stream of futures into a stream of those future's values, executing the futures concurrently.
bodies .for_each(|b| { async { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } } }) .await;
StreamExt::for_each
Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.
See also:
If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all
:
use futures::future; // 0.3.4 use reqwest::Client; // 0.10.1 use tokio; // 0.2.11 #[tokio::main] async fn main() { let client = Client::new(); let urls = vec!["https://api.ipify.org"; 2]; let bodies = future::join_all(urls.into_iter().map(|url| { let client = &client; async move { let resp = client.get(url).send().await?; resp.bytes().await } })) .await; for b in bodies { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } } }
I'd encourage using the first example as you usually want to limit the concurrency, which buffer
and buffer_unordered
help with.
Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.
use futures::{stream, StreamExt}; // 0.3.8 use reqwest::Client; // 0.10.9 use tokio; // 0.2.24, features = ["macros"] const PARALLEL_REQUESTS: usize = 2; #[tokio::main] async fn main() { let urls = vec!["https://api.ipify.org"; 2]; let client = Client::new(); let bodies = stream::iter(urls) .map(|url| { let client = client.clone(); tokio::spawn(async move { let resp = client.get(url).send().await?; resp.bytes().await }) }) .buffer_unordered(PARALLEL_REQUESTS); bodies .for_each(|b| async { match b { Ok(Ok(b)) => println!("Got {} bytes", b.len()), Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e), Err(e) => eprintln!("Got a tokio::JoinError: {}", e), } }) .await; }
The primary differences are:
tokio::spawn
to perform work in separate tasks.reqwest::Client
. As recommended, we clone a shared client to make use of the connection pool.See also:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With