I'm using an example provided by the Tokio library and attempting to have a vector of all the currently active TCP connections. Ultimately, I would like to be able to broadcast a message to each of the active connections, by looping through them and writing a message to the socket.
To start with, I am trying to print out the current number of connections in one thread whilst accepting connections in another.
To do this, I'm trying to use a shared vector. I've not yet implemented the removal of connections from the vector as and when they disconnect.
// A tiny async echo server with tokio-core
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::stdout;
use std::io::Write;
fn main() {
// Create the event loop that will drive this server
let mut core = Core::new().unwrap();
let handle = core.handle();
// Bind the server's socket
let addr = "127.0.0.1:12345".parse().unwrap();
let tcp = TcpListener::bind(&addr, &handle).unwrap();
let mut connections = Arc::new((Mutex::new(Vec::new())));
thread::spawn(move || {
//Every 10 seconds print out the current number of connections
let mut i;
loop {
i = connections.lock().unwrap().len();
println!("There are {} connections", i);
stdout().flush();
thread::sleep_ms(10000);
}
});
// Iterate incoming connections
let server = tcp.incoming().for_each(|(tcp, _)| {
connections.lock().unwrap().push(tcp);
// Split up the read and write halves
let (reader, writer) = tcp.split();
// Future of the copy
let bytes_copied = io::copy(reader, writer);
// ... after which we'll print what happened
let handle_conn = bytes_copied.map(|(n, _, _)| {
println!("wrote {} bytes", n)
}).map_err(|err| {
println!("IO error {:?}", err)
});
// Spawn the future as a concurrent task
handle.spawn(handle_conn);
Ok(())
});
// Spin up the server on the event loop
core.run(server).unwrap();
}
At the moment this is failing to build with the following errors:
error[E0382]: capture of moved value: `connections`
--> src/main.rs:36:42
|
26 | thread::spawn(move || {
| ------- value moved (into closure) here
...
36 | let server = tcp.incoming().for_each(|(tcp, _)| {
| ^^^^^^^^^^ value captured here after move
|
= note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait
error[E0382]: use of moved value: `tcp`
--> src/main.rs:40:32
|
38 | connections.lock().unwrap().push(tcp);
| --- value moved here
39 | // Split up the read and write halves
40 | let (reader, writer) = tcp.split();
| ^^^ value used here after move
|
= note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait
Is it possible to achieve this without writing any unsafe code?
You get the first error because of the move closure:
let mut connections = Arc::new((Mutex::new(Vec::new())));
thread::spawn(move || {
let mut i = connections.lock().unwrap().len();
....
}
This actually moves the whole Arc
, while you only want to move "a part" of it (that is, move it in such a way that the reference count is incremented, and that both threads can use it).
To do this, we can use Arc::clone
:
let mut connections = Arc::new((Mutex::new(Vec::new())));
let conn = connections.clone();
thread::spawn(move || {
let mut i = conn.lock().unwrap().len();
....
}
This way, the cloned Arc
, conn
, is moved into the closure, and the original Arc
, connections
, is not, and hence still usable.
I'm not sure exactly what you are doing with your second error, but for the sake of simply counting the connections you do not need to push
the entire thing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With