Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deserialize from tokio socket

I am using tokio to implement a server which communicates with messages serialized with serde (bincode). Without asynchronous and futures I would do

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

fn main() {

    let ser = serialize("Test", Infinite).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);

    println!("{:?}", decode(&mut reader))
}

But what I need is a decode function which can work with an asyncronous socket as

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
    // Does not work:   
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

The only idea I have is to manually write the length into the buffer during encoding and then use read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
    let size = serialized_size(item);
    let mut buf = serialize(&size, Infinite).unwrap();
    let ser = serialize(item, Infinite).unwrap();
    buf.extend(ser);
    Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
    where R: AsyncRead + 'static {

    let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
        let size = deserialize::<u64>(&mut &buf[..]).unwrap();
        Ok((reader, size as usize))
    }).and_then(|(reader, size)| {
        read_exact(reader, vec![0u8; size])
    }).and_then(|(reader, buf)| {
        let item = deserialize(&mut &buf[..]).unwrap();
        Ok(item)
    });

    Box::new(read)
}

fn main() {

    let ser = encode_async(&String::from("Test")).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);
    let dec = decode_async(reader).wait();

    println!("{:?}", dec)
}

Is there a better way to implement the decoding?

like image 976
Manuel Schmidt Avatar asked Nov 02 '25 01:11

Manuel Schmidt


1 Answers

deserialize_from can't handle IO errors, especially not of the kind WouldBlock which is returned by async (non-blocking) Readers when they are waiting for more data. That is limited by the interface: deserialize_from doesn't return a Future or a partial state, it returns the full decoded Result and wouldn't know how to combine the Reader with an event loop to handle WouldBlock without busy looping.

Theoretically, it is possible to implement an async_deserialize_from, but not by using the interfaces provided by serde unless you read the full data to decode in advance, which would defeat the purpose.

You need to read the full data using tokio_io::io::read_to_end or tokio_io::io::read_exact (what you're currently using), if you know the size of the encoded data in an "endless" stream (or in a stream followed by other data).

like image 75
Stefan Avatar answered Nov 03 '25 21:11

Stefan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!