Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent access to vector from multiple threads using a mutex lock

Tags:

rust

I'm using an example provided by the Tokio library and attempting to have a vector of all the currently active TCP connections. Ultimately, I would like to be able to broadcast a message to each of the active connections, by looping through them and writing a message to the socket.

To start with, I am trying to print out the current number of connections in one thread whilst accepting connections in another.

To do this, I'm trying to use a shared vector. I've not yet implemented the removal of connections from the vector as and when they disconnect.

// A tiny async echo server with tokio-core
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::stdout;
use std::io::Write;

fn main() {
    // Create the event loop that will drive this server
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

    let mut connections = Arc::new((Mutex::new(Vec::new())));

    thread::spawn(move || {
        //Every 10 seconds print out the current number of connections
        let mut i;
        loop {              
          i = connections.lock().unwrap().len();
          println!("There are {} connections", i);
          stdout().flush();
          thread::sleep_ms(10000);
        }
    });



    // Iterate incoming connections
    let server = tcp.incoming().for_each(|(tcp, _)| {

        connections.lock().unwrap().push(tcp);
        // Split up the read and write halves
        let (reader, writer) = tcp.split();

        // Future of the copy
        let bytes_copied = io::copy(reader, writer);

        // ... after which we'll print what happened
        let handle_conn = bytes_copied.map(|(n, _, _)| {
            println!("wrote {} bytes", n)
        }).map_err(|err| {
            println!("IO error {:?}", err)
        });

        // Spawn the future as a concurrent task
        handle.spawn(handle_conn);

        Ok(())
    });

    // Spin up the server on the event loop
    core.run(server).unwrap();

}

At the moment this is failing to build with the following errors:

error[E0382]: capture of moved value: `connections`
  --> src/main.rs:36:42
   |
26 |     thread::spawn(move || {
   |                   ------- value moved (into closure) here
...
36 |     let server = tcp.incoming().for_each(|(tcp, _)| {
   |                                          ^^^^^^^^^^ value captured here after move
   |
   = note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait

error[E0382]: use of moved value: `tcp`
  --> src/main.rs:40:32
   |
38 |         connections.lock().unwrap().push(tcp);
   |                                          --- value moved here
39 |         // Split up the read and write halves
40 |         let (reader, writer) = tcp.split();
   |                                ^^^ value used here after move
   |
   = note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait

Is it possible to achieve this without writing any unsafe code?

like image 779
John Avatar asked Apr 15 '17 16:04

John


1 Answers

You get the first error because of the move closure:

let mut connections = Arc::new((Mutex::new(Vec::new())));
thread::spawn(move || {
    let mut i = connections.lock().unwrap().len();
    ....
}

This actually moves the whole Arc, while you only want to move "a part" of it (that is, move it in such a way that the reference count is incremented, and that both threads can use it).

To do this, we can use Arc::clone:

let mut connections = Arc::new((Mutex::new(Vec::new())));
let conn = connections.clone();
thread::spawn(move || {
    let mut i = conn.lock().unwrap().len();
    ....
}

This way, the cloned Arc, conn, is moved into the closure, and the original Arc, connections, is not, and hence still usable.

I'm not sure exactly what you are doing with your second error, but for the sake of simply counting the connections you do not need to push the entire thing.

like image 83
MartinHaTh Avatar answered Oct 28 '22 09:10

MartinHaTh