I have multiple threads performing some heavy operations and I need to use a client in middle of work. I'm using Hyper v0.11 as a HTTP client and I would like to reuse the connections so I need to share the same hyper::Client
in order to keep open the connections (under keep-alive
mode).
The client is not shareable among threads (it doesn't implement Sync
or Send
). Here a small snippet with the code I've tried to do:
let mut core = Core::new().expect("Create Client Event Loop");
let handle = core.handle();
let remote = core.remote();
let client = Client::new(&handle.clone());
thread::spawn(move || {
// intensive operations...
let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
remote.clone().spawn(|_| {
response.map(|_| { () }).map_err(|_| { () })
});
// more intensive operations...
});
core.run(futures::future::empty::<(), ()>()).unwrap();
This code doesn't compile:
thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>`
thread::spawn(move || {
^^^^^^^^^^^^^ within `[closure@src/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<tokio_proto::util::client_proxy::ClientProxy<tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RequestLine>, hyper::Body>, tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RawStatus>, tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>>, hyper::Error>>>>`
...
remote.clone().spawn(|_| {
^^^^^ the trait `std::marker::Sync` is not implemented for `futures::Future<Error=hyper::Error, Item=hyper::Response> + 'static`
Is there any way to reuse the same client from different threads or some other approach?
The short answer is no, but it's better that way.
Each Client
object holds a pool of connections. Here's how Hyper's Pool
is defined in version 0.11.0:
pub struct Pool<T> {
inner: Rc<RefCell<PoolInner<T>>>,
}
As inner
is reference-counted with an Rc
and borrow-checked in run-time with RefCell
, the pool is certainly not thread-safe. When you tried to move that Client
to a new thread, that object would be holding a pool that lives in another thread, which would have been a source of data races.
This implementation is understandable. Attempting to reuse an HTTP connection across multiple threads is not very usual, as it requires synchronized access to a resource that is mostly I/O intensive. This couples pretty well with Tokio's asynchronous nature. It is actually more reasonable to perform multiple requests in the same thread, and let Tokio's core take care of sending messages and receiving them asynchronously, without waiting for each response in sequence. Moreover, computationally intensive tasks can be executed by a CPU pool from futures_cpupool
. With that in mind, the code below works fine:
extern crate tokio_core;
extern crate hyper;
extern crate futures;
extern crate futures_cpupool;
use tokio_core::reactor::Core;
use hyper::client::Client;
use futures::Future;
use futures_cpupool::CpuPool;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle.clone());
let pool = CpuPool::new(1);
println!("Begin!");
let req = client.get("http://google.com".parse().unwrap())
.and_then(|res| {
println!("Response: {}", res.status());
Ok(())
});
let intensive = pool.spawn_fn(|| {
println!("I'm working hard!!!");
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Phew!");
Ok(())
});
let task = req.join(intensive)
.map(|_|{
println!("End!");
});
core.run(task).unwrap();
}
If the response is not received too late, the output will be:
Begin!
I'm working hard!!!
Response: 302 Found
Phew!
End!
If you have multiple tasks running in separate threads, the problem becomes open-ended, since there are multiple architectures feasible. One of them is to delegate all communications to a single actor, thus requiring all other worker threads to send their data to it. Alternatively, you can have one client object to each worker, thus also having separate connection pools.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With