Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cannot borrow data in an `Arc` as mutable

I don't know what to do next. It looks like I misunderstand something, or maybe I have not learned some critical topic.

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/lib.rs:69:23
   |
69 |                 match state.res_rx.recv().await {
   |                       ^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`
like image 308
Lex Avatar asked Sep 12 '25 13:09

Lex


2 Answers

use std::sync::Arc;

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(something);
    arc.increase();
}

gives

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:16:5
   |
16 |     arc.increase();
   |     ^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`

error: aborting due to previous error; 1 warning emitted

because it tries to borrow arc as mutable. For it to happen, DerefMut would have to be implemented for Arc but it's not because Arc is not meant to be mutable.

Wraping your object in a Mutex works:

use std::sync::{Arc, Mutex};

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(Mutex::new(something));
    arc.lock().unwrap().increase();
}

Now it can be shared and can be increased.

like image 107
PPP Avatar answered Sep 15 '25 03:09

PPP


Lucas Zanella's answer and Shepmaster's comments helped alot to refactor and simplify code. I've desided to pass ownership inside Proxy::new() function instead of using shared reference. The code became more readable, and I've avoided shared reference for mutable tokio::sync::mpsc::Receiver. Perhaps the question turned out to be too unstructured, but I came to a new approach thanks to the community. Refactored code is listed below.

use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};


pub struct Task {
    pub id: u32,
    pub url:  String,
}
pub enum Message{
    Failure(Task, Error),
    Success(Task, Response),
}
pub struct Proxy{
    id: u32,
    max_rps: u16,
    max_pending: u16,
    in_tx: Sender<Task>,
}


async fn send_msg<T>(tx: &Sender<T>, msg: T){
    match tx.send(msg).await {
        Err(error) => { eprintln!("{}", error) },
        _ => (),
    };
}


async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
    loop {
        if let Some(task) = in_rx.recv().await {
            let client_clone = client.clone();
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move {
                println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                match client_clone.get(&task.url).send().await {
                    Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                    Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                }
            });
        }
    }
}


async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
    loop {
        if let Some(message) = res_rx.recv().await {
            match message {
                Message::Success(task, res) => { 
                    send_msg(
                        &out_tx, 
                        format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                    ).await;
                },
                Message::Failure(task, err) => {
                    send_msg(&out_tx, err.to_string()).await;
                },
            }
        }
    }
}


impl Proxy{

    pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
        
        let mut client = Client::builder();
        if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        let client = client.build()?;
        let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
        let res_tx_clone = res_tx.clone();
        tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });

        tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
        
        Ok(Proxy{
            id,
            max_rps,
            max_pending,
            in_tx,
        })
    }

    pub fn get_in_tx(&self) -> Sender<Task> {
        self.in_tx.clone()
    }
}
like image 27
Lex Avatar answered Sep 15 '25 03:09

Lex