Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert a Bytes Iterator into a Stream in Rust

I'm trying to figure out build a feature which requires reading the contents of a file into a futures::stream::BoxStream but I'm having a tough time figuring out what I need to do.

I have figured out how to read a file byte by byte via Bytes which implements an iterator.

use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> Box<dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    Box::new(iter)
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    for b in async_read().into_iter() {
        println!("{:?}", b);
    }
}

However, I've been struggling a bunch trying to figure out how I can turn this Box<dyn Iterator<Item = Result<u8, std::io::Error>>> into an Stream.

I would have thought something like this would work:

use futures::stream;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> stream::BoxStream<'static, dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    std::pin::Pin::new(Box::new(stream::iter(iter)))
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    while let Some(b) = async_read().poll() {
        println!("{:?}", b);
    }
}

But I keep getting a ton of compiler errors, I've tried other permutations but generally getting no where.

One of the compiler errors:

std::pin::Pin::new
``` --> src/main.rs:14:24
   |
14 |     std::pin::Pin::new(Box::new(stream::iter(iter)))
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait object `dyn std::iter::Iterator`, found enum `std::result::Result`

Anyone have any advice?

I'm pretty new to Rust, and specifically Streams/lower level stuff so I apologize if I got anything wrong, feel free to correct me.

For some additional background, I'm trying to do this so you can CTRL-C out of a command in nushell

like image 584
Arash Outadi Avatar asked Jun 19 '20 01:06

Arash Outadi


1 Answers

I think you are overcomplicating it a bit, you can just return impl Stream from async_read, there is no need to box or pin (same goes for the original Iterator-based version). Then you need to set up an async runtime in order to poll the stream (in this example I just use the runtime provided by futures::executor::block_on). Then you can call futures::stream::StreamExt::next() on the stream to get a future representing the next item.

Here is one way to do this:

use futures::prelude::*;
use std::{
    fs::File,
    io::{prelude::*, BufReader},
};

fn async_read() -> impl Stream<Item = Result<u8, std::io::Error>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);
    stream::iter(reader.bytes())
}

async fn async_main() {
    while let Some(b) = async_read().next().await {
        println!("{:?}", b);
    }
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    futures::executor::block_on(async_main());
}
like image 112
Coder-256 Avatar answered Oct 04 '22 10:10

Coder-256