I don't know what to do next. It looks like I misunderstand something, or maybe I have not learned some critical topic.
use std::sync::Arc;
use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0
pub struct Task {
pub id: u32,
pub url: String,
}
pub enum Message {
Failure(Task, Error),
Success(Task, Response),
}
struct State {
client: reqwest::Client,
res_tx: Sender<Message>,
res_rx: Receiver<Message>,
}
pub struct Proxy {
state: Arc<State>,
max_rps: u16,
max_pending: u16,
id: u32,
parent_tx: Sender<String>,
}
async fn send_msg<T>(tx: &Sender<T>, msg: T) {
match tx.send(msg).await {
Err(error) => {
eprintln!("{}", error)
}
_ => (),
};
}
impl Proxy {
// Starts loop for input channel
async fn start_chin(&mut self) -> Sender<Task> {
let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
let state_outer = self.state.clone();
tokio::spawn(async move {
loop {
match chin_rx.recv().await {
Some(task) => {
let res_tx = state_outer.res_tx.clone();
let state = state_outer.clone();
tokio::spawn(async move {
match state.client.get(&task.url).send().await {
Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
}
});
}
None => (),
}
}
});
chin_tx
}
async fn start_chres(&self) {
let state = self.state.clone();
tokio::spawn(async move {
loop {
match state.res_rx.recv().await { // LINE PRODUCES ERROR
Some(task) => {}
None => (),
}
}
});
}
}
impl Proxy {
pub fn new(
id: u32,
parent_tx: Sender<String>,
proxy_addr: &str,
max_rps: u16,
max_pending: u16,
) -> Result<Self, Error> {
let client = reqwest::Client::builder();
if proxy_addr != "none" {
client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
}
let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size
Ok(Proxy {
id,
state: Arc::new(State {
client: client.build()?,
res_tx,
res_rx,
}),
max_rps,
max_pending,
parent_tx,
})
}
}
error[E0596]: cannot borrow data in an `Arc` as mutable
--> src/lib.rs:69:23
|
69 | match state.res_rx.recv().await {
| ^^^^^^^^^^^^ cannot borrow as mutable
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`
use std::sync::Arc;
struct Something {
size: usize
}
impl Something {
fn increase(&mut self) {
self.size = self.size + 1;
}
}
fn main() {
let something = Something{size: 1};
let arc = Arc::new(something);
arc.increase();
}
gives
error[E0596]: cannot borrow data in an `Arc` as mutable
--> src/main.rs:16:5
|
16 | arc.increase();
| ^^^ cannot borrow as mutable
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`
error: aborting due to previous error; 1 warning emitted
because it tries to borrow arc
as mutable. For it to happen, DerefMut
would have to be implemented for Arc
but it's not because Arc
is not meant to be mutable.
Wraping your object in a Mutex
works:
use std::sync::{Arc, Mutex};
struct Something {
size: usize
}
impl Something {
fn increase(&mut self) {
self.size = self.size + 1;
}
}
fn main() {
let something = Something{size: 1};
let arc = Arc::new(Mutex::new(something));
arc.lock().unwrap().increase();
}
Now it can be shared and can be increased.
Lucas Zanella's answer and Shepmaster's comments helped alot to refactor and simplify code. I've desided to pass ownership inside Proxy::new()
function instead of using shared reference. The code became more readable, and I've avoided shared reference for mutable tokio::sync::mpsc::Receiver
. Perhaps the question turned out to be too unstructured, but I came to a new approach thanks to the community. Refactored code is listed below.
use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
pub struct Task {
pub id: u32,
pub url: String,
}
pub enum Message{
Failure(Task, Error),
Success(Task, Response),
}
pub struct Proxy{
id: u32,
max_rps: u16,
max_pending: u16,
in_tx: Sender<Task>,
}
async fn send_msg<T>(tx: &Sender<T>, msg: T){
match tx.send(msg).await {
Err(error) => { eprintln!("{}", error) },
_ => (),
};
}
async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
loop {
if let Some(task) = in_rx.recv().await {
let client_clone = client.clone();
let res_tx_clone = res_tx.clone();
tokio::spawn(async move {
println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
match client_clone.get(&task.url).send().await {
Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
}
});
}
}
}
async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
loop {
if let Some(message) = res_rx.recv().await {
match message {
Message::Success(task, res) => {
send_msg(
&out_tx,
format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
).await;
},
Message::Failure(task, err) => {
send_msg(&out_tx, err.to_string()).await;
},
}
}
}
}
impl Proxy{
pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
let mut client = Client::builder();
if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size
let client = client.build()?;
let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
let res_tx_clone = res_tx.clone();
tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });
tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
Ok(Proxy{
id,
max_rps,
max_pending,
in_tx,
})
}
pub fn get_in_tx(&self) -> Sender<Task> {
self.in_tx.clone()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With