Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error on Future generator closure: Captured variable cannot escape `FnMut` closure body

I want to create a simple websocket server. I want to process the incoming messages and send a response, but I get an error:

error: captured variable cannot escape `FnMut` closure body
  --> src\main.rs:32:27
   |
32 |       incoming.for_each(|m| async {
   |  _________________________-_^
   | |                         |
   | |                         inferred to be a `FnMut` closure
33 | |         match m {
34 | |             // Error here...
35 | |             Ok(message) => do_something(message, db, &mut outgoing).await,
36 | |             Err(e) => panic!(e)
37 | |         }
38 | |     }).await;
   | |_____^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

This gives a few hits on Stack Overflow but I don't see anywhere in my code where a variable is escaping. The async block won't run concurrently, so I don't see any problem. Furthermore, I feel like I am doing something very simple: I get a type which allows me to send data back to the client, but when using a reference to it in the async block, it gives a compile error. The error only occurs when I use the outgoing or db variable in the async code.

This is my code (error is in the handle_connection function):

main.rs

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}

Cargo.toml

[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"

When using async move, I get the following error:

code

incoming.for_each(|m| async move {
    let x = &mut outgoing;
    let b = db;
}).await;

error

error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
  --> src\main.rs:33:38
   |
31 |       let (mut outgoing, incoming) = ws_stream.split();
   |            ------------ captured outer variable
32 | 
33 |       incoming.for_each(|m| async move {
   |  ______________________________________^
34 | |         let x = &mut outgoing;
   | |                      --------
   | |                      |
   | |                      move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
   | |                      move occurs due to use in generator
35 | |         let b = db;
36 | |     }).await;
   | |_____^ move out of `outgoing` occurs here
like image 579
NoKey Avatar asked Jun 24 '20 14:06

NoKey


1 Answers

FnMut is an anonymous struct, since FnMutcaptured the &mut outgoing, it becomes a field inside of this anonymous struct and this field will be used on each call of FnMut , it can be called multiple times. If you lose it somehow (by returning or moving into another scope etc...) your program will not able to use that field for further calls, due to safety Rust Compiler doesn't let you do this(for your both case).

In your case instead of capturing the &mut outgoing we can use it as argument for each call, with this we'll keep the ownership of outgoing. You can do this by using fold from futures-rs:

incoming
    .fold(outgoing, |mut outgoing, m| async move {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }

        outgoing
    })
    .await;

This may seem a bit tricky but it does the job, we are using constant accumulator(outgoing) which will be used as an argument for our FnMut.

Playground (Thanks @Solomon Ucko for creating reproducible example)

See also :

  • How to return the captured variable from `FnMut` closure, which is a captor at the same time
  • How can I move a captured variable into a closure within a closure?
like image 140
Ömer Erden Avatar answered Sep 18 '22 02:09

Ömer Erden