Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cleanly break tokio-core event loop and futures::Stream in Rust

Tags:

rust

I am dabbling in tokio-core and can figure out how to spawn an event loop. However there are two things i am not sure of - how to gracefully exit the event loop and how to exit a stream running inside an event loop. For e.g consider this simple piece of code which spawns two listeners into the event loop and waits for another thread to indicate an exit condition:

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Stream, Future};
use std::thread;
use std::time::Duration;
use std::sync::mpsc::channel;

fn main() {
    let (get_tx, get_rx) = channel();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();
        let (tx, rx) = unbounded();
        get_tx.send(tx).unwrap(); // <<<<<<<<<<<<<<< (1)

        // Listener-0
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(&SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                                      &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let work = rx.for_each(|v| {
            if v {
                // (3) I want to shut down listener-0 above the release the resources
                Ok(())
            } else {
                Err(()) // <<<<<<<<<<<<<<< (2)

            }
        });

        let _ = core.run(work);
        println!("Exiting event loop thread");
    });

    let tx = get_rx.recv().unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0"); // <<<<<< (3)
    tx.send(true).unwrap();

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    tx.send(false).unwrap();

    j.join().unwrap();
}

So say after the sleep in the main thread i want a clean exit of the event loop thread. Currently I send something to the event loop to make it exit and thus releasing the thread.

However both, (1) and (2) feel hacky - i am forcing an error as an exit condition. My questions are:

1) Am I doing it right ? If not then what is the correct way to gracefully exit the event loop thread.

2) I don't event know how to do (3) - i.e. indicate a condition externally to shutdown listener-0 and free all it's resources. How do i achieve this ?

like image 749
ustulation Avatar asked Feb 25 '17 22:02

ustulation


2 Answers

The event loop (core) is not being turned any more (e.g. by run()) or is forgotten (drop()ed). There is no synchronous exit. core.run() returns and stops turning the loop when the Future passed to it completes.

A Stream completes by yielding None (marked with (3) in the code below). When e.g. a TCP connection is closed the Stream representing it completes and the other way around.

extern crate tokio_core;
extern crate futures;

use tokio_core::reactor::Core;
use futures::sync::mpsc::unbounded;
use tokio_core::net::TcpListener;
use std::net::SocketAddr;
use std::str::FromStr;
use futures::{Async, Stream, Future, Poll};
use std::thread;
use std::time::Duration;

struct CompletionPact<S, C>
    where S: Stream,
          C: Stream, 
{
    stream: S,
    completer: C,
}

fn stream_completion_pact<S, C>(s: S, c: C) -> CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    CompletionPact {
        stream: s,
        completer: c,
    }
}

impl<S, C> Stream for CompletionPact<S, C>
    where S: Stream,
          C: Stream,
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        match self.completer.poll() {
            Ok(Async::Ready(None)) |
            Err(_) |
            Ok(Async::Ready(Some(_))) => {
                // We are done, forget us
                Ok(Async::Ready(None)) // <<<<<< (3)
            },
            Ok(Async::NotReady) => {
                self.stream.poll()
            },
        }
    }
}

fn main() {
    // unbounded() is the equivalent of a Stream made from a channel()
    // directly create it in this thread instead of receiving a Sender
    let (tx, rx) = unbounded::<()>();
    // A second one to cause forgetting the listener
    let (l0tx, l0rx) = unbounded::<()>();

    let j = thread::spawn(move || {
        let mut core = Core::new().unwrap();

        // Listener-0
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:44444").unwrap(),
                    &core.handle())
                .unwrap();

            // wrap the Stream of incoming connections (which usually doesn't
            // complete) into a Stream that completes when the
            // other side is drop()ed or sent on
            let fe = stream_completion_pact(l.incoming(), l0rx)
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        // Listener1
        {
            let l = TcpListener::bind(
                    &SocketAddr::from_str("127.0.0.1:55555").unwrap(),
                    &core.handle())
                .unwrap();

            let fe = l.incoming()
                .for_each(|(_sock, peer)| {
                    println!("Accepted from {}", peer);
                    Ok(())
                })
                .map_err(|e| println!("----- {:?}", e));

            core.handle().spawn(fe);
        }

        let _ = core.run(rx.into_future());
        println!("Exiting event loop thread");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Want to terminate listener-0");
    // A drop() will result in the rx side Stream being completed,
    // which is indicated by Ok(Async::Ready(None)).
    // Our wrapper behaves the same when something is received.
    // When the event loop encounters a
    // Stream that is complete it forgets about it. Which propagates to a
    // drop() that close()es the file descriptor, which closes the port if
    // nothing else uses it.
    l0tx.send(()).unwrap(); // alternatively: drop(l0tx);
    // Note that this is async and is only the signal
    // that starts the forgetting.

    thread::sleep(Duration::from_secs(2));
    println!("Want to exit event loop");
    // Same concept. The reception or drop() will cause Stream completion.
    // A completed Future will cause run() to return.
    tx.send(()).unwrap();

    j.join().unwrap();
}
like image 75
Jan Zerebecki Avatar answered Nov 15 '22 09:11

Jan Zerebecki


I implemented graceful shutdown via a oneshot channel.

The trick was to use both a oneshot channel to cancel the tcp listener, and use a select! on the two futures. Note I'm using tokio 0.2 and futures 0.3 in the example below.


use futures::channel::oneshot;
use futures::{FutureExt, StreamExt};
use std::thread;
use tokio::net::TcpListener;

pub struct ServerHandle {
    // This is the thread in which the server will block
    thread: thread::JoinHandle<()>,

    // This switch can be used to trigger shutdown of the server.
    kill_switch: oneshot::Sender<()>,
}

impl ServerHandle {
    pub fn stop(self) {
        self.kill_switch.send(()).unwrap();
        self.thread.join().unwrap();
    }
}

pub fn run_server() -> ServerHandle {
    let (kill_switch, kill_switch_receiver) = oneshot::channel::<()>();

    let thread = thread::spawn(move || {
        info!("Server thread begun!!!");
        let mut runtime = tokio::runtime::Builder::new()
            .basic_scheduler()
            .enable_all()
            .thread_name("Tokio-server-thread")
            .build()
            .unwrap();

        runtime.block_on(async {
            server_prog(kill_switch_receiver).await.unwrap();
        });

        info!("Server finished!!!");
    });

    ServerHandle {
        thread,
        kill_switch,
    }
}

async fn server_prog(kill_switch_receiver: oneshot::Receiver<()>) -> std::io::Result<()> {
    let addr = "127.0.0.1:12345";
    let addr: std::net::SocketAddr = addr.parse().unwrap();
    let mut listener = TcpListener::bind(&addr).await?;
    let mut kill_switch_receiver = kill_switch_receiver.fuse();
    let mut incoming = listener.incoming().fuse();

    loop {
        futures::select! {
            x = kill_switch_receiver => {
                break;
            },
            optional_new_client = incoming.next() => {
                if let Some(new_client) = optional_new_client {
                    let peer_socket = new_client?;
                    info!("Client connected!");
                    let peer = process_client(peer_socket, db.clone());
                    peers.lock().unwrap().push(peer);
                } else {
                    info!("No more incoming connections.");
                    break;
                }
            },
        };
    }
    Ok(())
}

Hopes this helps others (or future me ;)).

My code lives here:

https://github.com/windelbouwman/lognplot/blob/master/lognplot/src/server/server.rs

like image 28
Windel Avatar answered Nov 15 '22 10:11

Windel