Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using a custom transporter for Rust's hyper http crate

Tags:

rust

hyper

ps: the answer below helped but it's not the answer I need, I have a new problem and I edited the question

I'm trying to make a custom transporter for the hyper http crate, so I can transport http packets in my own way.

Hyper's http client can be passed a custom https://docs.rs/hyper/0.14.2/hyper/client/connect/trait.Connect.html here:

pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,

If we look at

impl<S, T> Connect for S where    

S: Service<Uri, Response = T> + Send + 'static,    

S::Error: Into<Box<dyn StdError + Send + Sync>>,    

S::Future: Unpin + Send,    

T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, 

the type T, which is the type of the Response, must implement AsyncRead + AsyncWrite, so I've chosen type Response = Cursor<Vec<u8>>.

Here's my custom transporter with a Response of type std::io::Cursor wrapped in CustomResponse so I can implement AsyncWrite and AsyncRead to it:

use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};

#[derive(Clone)]
pub struct CustomTransporter;

unsafe impl Send for CustomTransporter {}

impl CustomTransporter {
    pub fn new() -> CustomTransporter {
        CustomTransporter{}
    }
}

impl Connection for CustomTransporter {
    fn connected(&self) -> Connected {
        Connected::new()
    }
}

pub struct CustomResponse {
    //w: Cursor<Vec<u8>>,
    v: Vec<u8>,
    i: i32
}

unsafe impl Send for CustomResponse {
    
}

impl Connection for CustomResponse {
    fn connected(&self) -> Connected {
        println!("connected");
        Connected::new()
    }
}

impl AsyncRead for CustomResponse {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>
    ) -> Poll<std::io::Result<()>> {
        self.i+=1;
        if self.i >=3 {
            println!("poll_read for buf size {}", buf.capacity());
            buf.put_slice(self.v.as_slice());
            println!("did poll_read");
            Poll::Ready(Ok(()))
        } else {
            println!("poll read pending, i={}", self.i);
            Poll::Pending
        }
    }
}

impl AsyncWrite for CustomResponse {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8]
    ) -> Poll<Result<usize, std::io::Error>>{
        //let v = vec!();
        println!("poll_write____");

        let s = match std::str::from_utf8(buf) {
            Ok(v) => v,
            Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
        };

        println!("result: {}, size: {}, i: {}", s, s.len(), self.i);
        if self.i>=0{
            //r
            Poll::Ready(Ok(s.len()))
        }else{
            println!("poll_write pending");
            Poll::Pending
        }
    }
    fn poll_flush(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>> {
        println!("poll_flush");
        if self.i>=0{
            println!("DID poll_flush");
            Poll::Ready(Ok(()))
        }else{
            println!("poll_flush pending");
            Poll::Pending
        }
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), std::io::Error>>
    {
        println!("poll_shutdown");
        Poll::Ready(Ok(()))
    }
}


impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        println!("poll_ready");
        Poll::Ready(Ok(()))
        //Poll::Pending
    }

    fn call(&mut self, req: hyper::Uri) -> Self::Future {
        println!("call");
        // create the body
        let body: Vec<u8> = "HTTP/1.1 200 OK\nDate: Mon, 27 Jul 2009 12:28:53 GMT\nServer: Apache/2.2.14 (Win32)\nLast-Modified: Wed, 22 Jul 2009 19:15:56 GMT\nContent-Length: 88\nContent-Type: text/html\nConnection: Closed<html><body><h1>Hello, World!</h1></body></html>".as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = CustomResponse{
            //w: Cursor::new(body),
            v: body,
            i: 0
        };
         
        // create a response in a future.
        let fut = async move{
            Ok(resp)
        };
        println!("gonna return from call");
        // Return the response as an immediate future
        Box::pin(fut)
    }
}

Then I use it like this:

let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();

However, it gets stuck and hyper never reads my response, but it writes the GET to it.

Here's a complete project for testing: https://github.com/lzunsec/rust_hyper_custom_transporter/blob/39cd036fc929057d975a71969ccbe97312543061/src/custom_req.rs

RUn like this:

cargo run http://google.com
like image 241
Guerlando OCs Avatar asked Feb 02 '21 01:02

Guerlando OCs


1 Answers

I cannot simply implement Send to Future, and I cannot change Future by a wrapper. What should I do here?

It looks like the problem is your Service::Future is missing the Send constraint. The future being returned in call is already Send so it will work with the simple change:

impl Service<hyper::Uri> for CustomTransporter {
    type Response = CustomResponse;
    type Error = hyper::http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
                                                                                  // ^^^^
    ...

Your code has a few other errors: un-inferred vec!(), self: Pin<...> missing mut, CustomResponse should be pub...

You can specify the B of client by using inference:

let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);

Or by using the turbofish operator on build:

let client = Client::builder().build::<CustomTransporter, hyper::Body>(connector);

I don't know enough about creating custom hyper transports to know if its functional, but these fixes make it compile. Hopefully it helps you make progress.

like image 187
kmdreko Avatar answered Sep 20 '22 01:09

kmdreko