Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join futures with limited concurrency

Tags:

rust

I have a large vector of Hyper HTTP request futures and want to resolve them into a vector of results. Since there is a limit of maximum open files, I want to limit concurrency to N futures.

I've experimented with Stream::buffer_unordered but seems like it executed futures one by one.

like image 254
Pietro Avatar asked Apr 06 '17 06:04

Pietro


1 Answers

We've used code like this in a project to avoid opening too many TCP sockets. These futures have Hyper futures within, so it seems exactly the same case.

// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
    futures::stream::iter(iterator_of_futures.map(Ok))
    .buffer_unordered(PARALLELISM);

// Everything after here is just using the stream in
// some manner, not directly related

let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);

// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
    match core.run(all_done) {
        Ok((None, _)) => break,
        Ok((Some(v), next_all_done)) => {
            successes.push(v);
            all_done = next_all_done.into_future();
        }
        Err((v, next_all_done)) => {
            failures.push(v);
            all_done = next_all_done.into_future();
        }
    }
}

This is used in a piece of example code, so the event loop (core) is explicitly driven. Watching the number of file handles used by the program showed that it was capped. Additionally, before this bottleneck was added, we quickly ran out of allowable file handles, whereas afterward we did not.

like image 184
Shepmaster Avatar answered Nov 12 '22 09:11

Shepmaster