I'm not able to create a client that tries to connect to a server and:
Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future
with an infinite loop?
extern crate tokio_line; use tokio_line::LineCodec; fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> { let remote_addr = "127.0.0.1:9876".parse().unwrap(); let tcp = TcpStream::connect(&remote_addr, handle); let client = tcp.and_then(|stream| { let (sink, from_server) = stream.framed(LineCodec).split(); let reader = from_server.for_each(|message| { println!("{}", message); Ok(()) }); reader.map(|_| { println!("CLIENT DISCONNECTED"); () }).map_err(|err| err) }); let client = client.map_err(|_| { panic!()}); Box::new(client) } fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); let client = get_connection(&handle); let client = client.and_then(|c| { println!("Try to reconnect"); get_connection(&handle); Ok(()) }); core.run(client).unwrap(); }
Add the tokio-line crate with:
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
The key question seems to be: how do I implement an infinite loop using Tokio? By answering this question, we can tackle the problem of reconnecting infinitely upon disconnection. From my experience writing asynchronous code, recursion seems to be a straightforward solution to this problem.
UPDATE: as pointed out by Shepmaster (and the folks of the Tokio Gitter), my original answer leaks memory since we build a chain of futures that grows on each iteration. Here follows a new one:
loop_fn
There is a function in the futures
crate that does exactly what you need. It is called loop_fn
. You can use it by changing your main function to the following:
fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); let client = future::loop_fn((), |_| { // Run the get_connection function and loop again regardless of its result get_connection(&handle).map(|_| -> Loop<(), ()> { Loop::Continue(()) }) }); core.run(client).unwrap(); }
The function resembles a for loop, which can continue or break depending on the result of get_connection
(see the documentation for the Loop
enum). In this case, we choose to always continue, so it will infinitely keep reconnecting.
Note that your version of get_connection
will panic if there is an error (e.g. if the client cannot connect to the server). If you also want to retry after an error, you should remove the call to panic!
.
Here follows my old answer, in case anyone finds it interesting.
WARNING: using the code below results in unbounded memory growth.
get_connection
loop infinitelyWe want to call the get_connection
function each time the client is disconnected, so that is exactly what we are going to do (look at the comment after reader.and_then
):
fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> { let remote_addr = "127.0.0.1:9876".parse().unwrap(); let tcp = TcpStream::connect(&remote_addr, handle); let handle_clone = handle.clone(); let client = tcp.and_then(|stream| { let (sink, from_server) = stream.framed(LineCodec).split(); let reader = from_server.for_each(|message| { println!("{}", message); Ok(()) }); reader.and_then(move |_| { println!("CLIENT DISCONNECTED"); // Attempt to reconnect in the future get_connection(&handle_clone) }) }); let client = client.map_err(|_| { panic!()}); Box::new(client) }
Remember that get_connection
is non-blocking. It just constructs a Box<Future>
. This means that when calling it recursively, we still don't block. Instead, we get a new future, which we can link to the previous one by using and_then
. As you can see, this is different to normal recursion since the stack doesn't grow on each iteration.
Note that we need to clone the handle
(see handle_clone
), and move it into the closure passed to reader.and_then
. This is necessary because the closure is going to live longer than the function (it will be contained in the future we are returning).
The code you provided doesn't handle the case in which the client is unable to connect to the server (nor any other errors). Following the same principle shown above, we can handle errors by changing the end of get_connection
to the following:
let handle_clone = handle.clone(); let client = client.or_else(move |err| { // Note: this code will infinitely retry, but you could pattern match on the error // to retry only on certain kinds of error println!("Error connecting to server: {}", err); get_connection(&handle_clone) }); Box::new(client)
Note that or_else
is like and_then
, but it operates on the error produced by the future.
main
Finally, it is not necessary to use and_then
in the main
function. You can replace your main
by the following code:
fn main() { let mut core = Core::new().unwrap(); let handle = core.handle(); let client = get_connection(&handle); core.run(client).unwrap(); }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With