ps: the answer below helped but it's not the answer I need, I have a new problem and I edited the question
I'm trying to make a custom transporter for the hyper http crate, so I can transport http packets in my own way.
Hyper's http client can be passed a custom https://docs.rs/hyper/0.14.2/hyper/client/connect/trait.Connect.html here:
pub fn build<C, B>(&self, connector: C) -> Client<C, B> where C: Connect + Clone, B: HttpBody + Send, B::Data: Send,
If we look at
impl<S, T> Connect for S where
S: Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
the type T
, which is the type of the Response
, must implement AsyncRead + AsyncWrite
, so I've chosen type Response = Cursor<Vec<u8>>
.
Here's my custom transporter with a Response
of type std::io::Cursor
wrapped in CustomResponse
so I can implement AsyncWrite
and AsyncRead
to it:
use hyper::service::Service;
use core::task::{Context, Poll};
use core::future::Future;
use std::pin::Pin;
use std::io::Cursor;
use hyper::client::connect::{Connection, Connected};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Clone)]
pub struct CustomTransporter;
unsafe impl Send for CustomTransporter {}
impl CustomTransporter {
pub fn new() -> CustomTransporter {
CustomTransporter{}
}
}
impl Connection for CustomTransporter {
fn connected(&self) -> Connected {
Connected::new()
}
}
pub struct CustomResponse {
//w: Cursor<Vec<u8>>,
v: Vec<u8>,
i: i32
}
unsafe impl Send for CustomResponse {
}
impl Connection for CustomResponse {
fn connected(&self) -> Connected {
println!("connected");
Connected::new()
}
}
impl AsyncRead for CustomResponse {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>
) -> Poll<std::io::Result<()>> {
self.i+=1;
if self.i >=3 {
println!("poll_read for buf size {}", buf.capacity());
buf.put_slice(self.v.as_slice());
println!("did poll_read");
Poll::Ready(Ok(()))
} else {
println!("poll read pending, i={}", self.i);
Poll::Pending
}
}
}
impl AsyncWrite for CustomResponse {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<Result<usize, std::io::Error>>{
//let v = vec!();
println!("poll_write____");
let s = match std::str::from_utf8(buf) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
println!("result: {}, size: {}, i: {}", s, s.len(), self.i);
if self.i>=0{
//r
Poll::Ready(Ok(s.len()))
}else{
println!("poll_write pending");
Poll::Pending
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>> {
println!("poll_flush");
if self.i>=0{
println!("DID poll_flush");
Poll::Ready(Ok(()))
}else{
println!("poll_flush pending");
Poll::Pending
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), std::io::Error>>
{
println!("poll_shutdown");
Poll::Ready(Ok(()))
}
}
impl Service<hyper::Uri> for CustomTransporter {
type Response = CustomResponse;
type Error = hyper::http::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
println!("poll_ready");
Poll::Ready(Ok(()))
//Poll::Pending
}
fn call(&mut self, req: hyper::Uri) -> Self::Future {
println!("call");
// create the body
let body: Vec<u8> = "HTTP/1.1 200 OK\nDate: Mon, 27 Jul 2009 12:28:53 GMT\nServer: Apache/2.2.14 (Win32)\nLast-Modified: Wed, 22 Jul 2009 19:15:56 GMT\nContent-Length: 88\nContent-Type: text/html\nConnection: Closed<html><body><h1>Hello, World!</h1></body></html>".as_bytes()
.to_owned();
// Create the HTTP response
let resp = CustomResponse{
//w: Cursor::new(body),
v: body,
i: 0
};
// create a response in a future.
let fut = async move{
Ok(resp)
};
println!("gonna return from call");
// Return the response as an immediate future
Box::pin(fut)
}
}
Then I use it like this:
let connector = CustomTransporter::new();
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
let mut res = client.get(url).await.unwrap();
However, it gets stuck and hyper never reads my response, but it writes the GET to it.
Here's a complete project for testing: https://github.com/lzunsec/rust_hyper_custom_transporter/blob/39cd036fc929057d975a71969ccbe97312543061/src/custom_req.rs
RUn like this:
cargo run http://google.com
I cannot simply implement
Send
toFuture
, and I cannot changeFuture
by a wrapper. What should I do here?
It looks like the problem is your Service::Future
is missing the Send
constraint. The future being returned in call
is already Send
so it will work with the simple change:
impl Service<hyper::Uri> for CustomTransporter {
type Response = CustomResponse;
type Error = hyper::http::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
// ^^^^
...
Your code has a few other errors: un-inferred vec!()
, self: Pin<...>
missing mut
, CustomResponse
should be pub
...
You can specify the B
of client
by using inference:
let client: Client<CustomTransporter, hyper::Body> = Client::builder().build(connector);
Or by using the turbofish operator on build
:
let client = Client::builder().build::<CustomTransporter, hyper::Body>(connector);
I don't know enough about creating custom hyper transports to know if its functional, but these fixes make it compile. Hopefully it helps you make progress.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With