Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use hyper::client from another thread?

I have multiple threads performing some heavy operations and I need to use a client in middle of work. I'm using Hyper v0.11 as a HTTP client and I would like to reuse the connections so I need to share the same hyper::Client in order to keep open the connections (under keep-alive mode).

The client is not shareable among threads (it doesn't implement Sync or Send). Here a small snippet with the code I've tried to do:

let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();

let remote = core.remote();

let client = Client::new(&handle.clone());

thread::spawn(move || {

    // intensive operations...

    let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
        println!("Response: {}", res.status());
        Ok(())
    });

    remote.clone().spawn(|_| {
        response.map(|_| { () }).map_err(|_| { () })
    });

    // more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();

This code doesn't compile:

thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>`

thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<tokio_proto::util::client_proxy::ClientProxy<tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RequestLine>, hyper::Body>, tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RawStatus>, tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>>, hyper::Error>>>>`
...
remote.clone().spawn(|_| {
               ^^^^^ the trait `std::marker::Sync` is not implemented for `futures::Future<Error=hyper::Error, Item=hyper::Response> + 'static`

Is there any way to reuse the same client from different threads or some other approach?

like image 541
cspinetta Avatar asked Jul 01 '17 23:07

cspinetta


1 Answers

The short answer is no, but it's better that way.

Each Client object holds a pool of connections. Here's how Hyper's Pool is defined in version 0.11.0:

pub struct Pool<T> {
    inner: Rc<RefCell<PoolInner<T>>>,
}

As inner is reference-counted with an Rc and borrow-checked in run-time with RefCell, the pool is certainly not thread-safe. When you tried to move that Client to a new thread, that object would be holding a pool that lives in another thread, which would have been a source of data races.

This implementation is understandable. Attempting to reuse an HTTP connection across multiple threads is not very usual, as it requires synchronized access to a resource that is mostly I/O intensive. This couples pretty well with Tokio's asynchronous nature. It is actually more reasonable to perform multiple requests in the same thread, and let Tokio's core take care of sending messages and receiving them asynchronously, without waiting for each response in sequence. Moreover, computationally intensive tasks can be executed by a CPU pool from futures_cpupool. With that in mind, the code below works fine:

extern crate tokio_core;
extern crate hyper;
extern crate futures;
extern crate futures_cpupool;

use tokio_core::reactor::Core;
use hyper::client::Client;
use futures::Future;
use futures_cpupool::CpuPool;

fn main() {

    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = Client::new(&handle.clone());
    let pool = CpuPool::new(1);

    println!("Begin!");
    let req = client.get("http://google.com".parse().unwrap())
        .and_then(|res| {
            println!("Response: {}", res.status());
            Ok(())
        });
    let intensive = pool.spawn_fn(|| {
        println!("I'm working hard!!!");
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!("Phew!");
        Ok(())
    });

    let task = req.join(intensive)
        .map(|_|{
            println!("End!");
        });
    core.run(task).unwrap();
}

If the response is not received too late, the output will be:

Begin!
I'm working hard!!!
Response: 302 Found
Phew!
End!

If you have multiple tasks running in separate threads, the problem becomes open-ended, since there are multiple architectures feasible. One of them is to delegate all communications to a single actor, thus requiring all other worker threads to send their data to it. Alternatively, you can have one client object to each worker, thus also having separate connection pools.

like image 69
E_net4 stands with Ukraine Avatar answered Nov 14 '22 14:11

E_net4 stands with Ukraine