Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I perform parallel asynchronous HTTP GET requests with reqwest?

The async example is useful, but being new to Rust and Tokio, I am struggling to work out how to do N requests at once, using URLs from a vector, and creating an iterator of the response HTML for each URL as a string.

How could this be done?

like image 727
user964375 Avatar asked Jun 26 '18 13:06

user964375


People also ask

How do you send HTTP requests in Rust?

use http::{Request, Response}; let mut request = Request::builder() . uri("https://www.rust-lang.org/") . header("User-Agent", "my-awesome-agent/1.0"); if needs_awesome_header() { request = request. header("Awesome", "yes"); } let response = send(request.


1 Answers

Concurrent requests

As of reqwest 0.10:

use futures::{stream, StreamExt}; // 0.3.5 use reqwest::Client; // 0.10.6 use tokio; // 0.2.21, features = ["macros"]  const CONCURRENT_REQUESTS: usize = 2;  #[tokio::main] async fn main() {     let client = Client::new();      let urls = vec!["https://api.ipify.org"; 2];      let bodies = stream::iter(urls)         .map(|url| {             let client = &client;             async move {                 let resp = client.get(url).send().await?;                 resp.bytes().await             }         })         .buffer_unordered(CONCURRENT_REQUESTS);      bodies         .for_each(|b| async {             match b {                 Ok(b) => println!("Got {} bytes", b.len()),                 Err(e) => eprintln!("Got an error: {}", e),             }         })         .await; } 

stream::iter(urls) 

stream::iter

Take a collection of strings and convert it into a Stream.

.map(|url| { 

StreamExt::map

Run an asynchronous function on every element in the stream and transform the element to a new type.

let client = &client; async move { 

Take an explicit reference to the Client and move the reference (not the original Client) into an anonymous asynchronous block.

let resp = client.get(url).send().await?; 

Start an asynchronous GET request using the Client's connection pool and wait for the request.

resp.bytes().await 

Request and wait for the bytes of the response.

.buffer_unordered(N); 

StreamExt::buffer_unordered

Convert a stream of futures into a stream of those future's values, executing the futures concurrently.

bodies     .for_each(|b| {         async {             match b {                 Ok(b) => println!("Got {} bytes", b.len()),                 Err(e) => eprintln!("Got an error: {}", e),             }         }     })     .await; 

StreamExt::for_each

Convert the stream back into a single future, printing out the amount of data received along the way, then wait for the future to complete.

See also:

  • Join futures with limited concurrency
  • How to merge iterator of streams?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
  • What is the difference between `then`, `and_then` and `or_else` in Rust futures?

Without bounded execution

If you wanted to, you could also convert an iterator into an iterator of futures and use future::join_all:

use futures::future; // 0.3.4 use reqwest::Client; // 0.10.1 use tokio; // 0.2.11  #[tokio::main] async fn main() {     let client = Client::new();      let urls = vec!["https://api.ipify.org"; 2];      let bodies = future::join_all(urls.into_iter().map(|url| {         let client = &client;         async move {             let resp = client.get(url).send().await?;             resp.bytes().await         }     }))     .await;      for b in bodies {         match b {             Ok(b) => println!("Got {} bytes", b.len()),             Err(e) => eprintln!("Got an error: {}", e),         }     } } 

I'd encourage using the first example as you usually want to limit the concurrency, which buffer and buffer_unordered help with.

Parallel requests

Concurrent requests are generally good enough, but there are times where you need parallel requests. In that case, you need to spawn a task.

use futures::{stream, StreamExt}; // 0.3.8 use reqwest::Client; // 0.10.9 use tokio; // 0.2.24, features = ["macros"]  const PARALLEL_REQUESTS: usize = 2;  #[tokio::main] async fn main() {     let urls = vec!["https://api.ipify.org"; 2];      let client = Client::new();      let bodies = stream::iter(urls)         .map(|url| {             let client = client.clone();             tokio::spawn(async move {                 let resp = client.get(url).send().await?;                 resp.bytes().await             })         })         .buffer_unordered(PARALLEL_REQUESTS);      bodies         .for_each(|b| async {             match b {                 Ok(Ok(b)) => println!("Got {} bytes", b.len()),                 Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),                 Err(e) => eprintln!("Got a tokio::JoinError: {}", e),             }         })         .await; } 

The primary differences are:

  • We use tokio::spawn to perform work in separate tasks.
  • We have to give each task its own reqwest::Client. As recommended, we clone a shared client to make use of the connection pool.
  • There's an additional error case when the task cannot be joined.

See also:

  • What is the difference between concurrent programming and parallel programming?
  • What is the difference between concurrency and parallelism?
  • What is the difference between concurrency, parallelism and asynchronous methods?
like image 125
Shepmaster Avatar answered Oct 12 '22 18:10

Shepmaster