Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronously reconnecting a client to a server in an infinite loop

Tags:

I'm not able to create a client that tries to connect to a server and:

  • if the server is down it has to try again in an infinite loop
  • if the server is up and connection is successful, when the connection is lost (i.e. server disconnects the client) the client has to restart the infinite loop to try to connect to the server

Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future with an infinite loop?

extern crate tokio_line; use tokio_line::LineCodec;  fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                        let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                                 let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                                   let client = tcp.and_then(|stream| {                                                                                                                                                                                     let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                          let reader = from_server.for_each(|message| {                                                                                                                                                                            println!("{}", message);                                                                                                                                                                                             Ok(())                                                                                                                                                                                                           });                                                                                                                                                                                                                   reader.map(|_| {                                                                                                                                                                                                         println!("CLIENT DISCONNECTED");                                                                                                                                                                                     ()                                                                                                                                                                                                               }).map_err(|err| err)                                                                                                                                                                                            });                                                                                                                                                                                                                   let client = client.map_err(|_| { panic!()});                                                                                                                                                                        Box::new(client)                                                                                                                                                                                                 }                                                                                                                                                                                                                     fn main() {                                                                                                                                                                                                              let mut core = Core::new().unwrap();                                                                                                                                                                                 let handle = core.handle();                                                                                                                                                                                          let client = get_connection(&handle);                                                                                                                                                                                 let client = client.and_then(|c| {                                                                                                                                                                                       println!("Try to reconnect");                                                                                                                                                                                        get_connection(&handle);                                                                                                                                                                                             Ok(())                                                                                                                                                                                                           });                                                                                                                                                                                                                   core.run(client).unwrap();                                                                                                                                                                                       } 

Add the tokio-line crate with:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" } 
like image 556
Danilo Silva Avatar asked Feb 08 '17 08:02

Danilo Silva


1 Answers

The key question seems to be: how do I implement an infinite loop using Tokio? By answering this question, we can tackle the problem of reconnecting infinitely upon disconnection. From my experience writing asynchronous code, recursion seems to be a straightforward solution to this problem.

UPDATE: as pointed out by Shepmaster (and the folks of the Tokio Gitter), my original answer leaks memory since we build a chain of futures that grows on each iteration. Here follows a new one:

Updated answer: use loop_fn

There is a function in the futures crate that does exactly what you need. It is called loop_fn. You can use it by changing your main function to the following:

fn main() {     let mut core = Core::new().unwrap();     let handle = core.handle();     let client = future::loop_fn((), |_| {         // Run the get_connection function and loop again regardless of its result         get_connection(&handle).map(|_| -> Loop<(), ()> {             Loop::Continue(())         })     });      core.run(client).unwrap(); } 

The function resembles a for loop, which can continue or break depending on the result of get_connection (see the documentation for the Loop enum). In this case, we choose to always continue, so it will infinitely keep reconnecting.

Note that your version of get_connection will panic if there is an error (e.g. if the client cannot connect to the server). If you also want to retry after an error, you should remove the call to panic!.


Old answer: use recursion

Here follows my old answer, in case anyone finds it interesting.

WARNING: using the code below results in unbounded memory growth.

Making get_connection loop infinitely

We want to call the get_connection function each time the client is disconnected, so that is exactly what we are going to do (look at the comment after reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {     let remote_addr = "127.0.0.1:9876".parse().unwrap();     let tcp = TcpStream::connect(&remote_addr, handle);     let handle_clone = handle.clone();      let client = tcp.and_then(|stream| {         let (sink, from_server) = stream.framed(LineCodec).split();         let reader = from_server.for_each(|message| {             println!("{}", message);             Ok(())         });          reader.and_then(move |_| {             println!("CLIENT DISCONNECTED");             // Attempt to reconnect in the future             get_connection(&handle_clone)         })     });      let client = client.map_err(|_| { panic!()});     Box::new(client) } 

Remember that get_connection is non-blocking. It just constructs a Box<Future>. This means that when calling it recursively, we still don't block. Instead, we get a new future, which we can link to the previous one by using and_then. As you can see, this is different to normal recursion since the stack doesn't grow on each iteration.

Note that we need to clone the handle (see handle_clone), and move it into the closure passed to reader.and_then. This is necessary because the closure is going to live longer than the function (it will be contained in the future we are returning).

Handling errors

The code you provided doesn't handle the case in which the client is unable to connect to the server (nor any other errors). Following the same principle shown above, we can handle errors by changing the end of get_connection to the following:

let handle_clone = handle.clone(); let client = client.or_else(move |err| {     // Note: this code will infinitely retry, but you could pattern match on the error     // to retry only on certain kinds of error     println!("Error connecting to server: {}", err);     get_connection(&handle_clone) }); Box::new(client) 

Note that or_else is like and_then, but it operates on the error produced by the future.

Removing unnecessary code from main

Finally, it is not necessary to use and_then in the main function. You can replace your main by the following code:

fn main() {     let mut core = Core::new().unwrap();     let handle = core.handle();     let client = get_connection(&handle);     core.run(client).unwrap(); } 
like image 50
aochagavia Avatar answered Oct 26 '22 21:10

aochagavia