Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to create a Stream from a File rather than loading the file contents into memory?

Tags:

rust

I'm currently using the rusoto_s3 lib to upload a file to S3. All the examples I have found do the same thing: Open a file, read the full contents of the file into memory (Vec<u8>), then convert the Vec into a ByteStream (which implements From<Vec<u8>>). Here is a code example:

fn upload_file(&self, file_path: &Path) -> FileResult<PutObjectOutput> {
    let mut file = File::open(file_path)?;
    let mut file_data: Vec<u8> = vec![];
    file.read_to_end(&mut file_data)?;

    let client = S3Client::new(Region::UsEast1);
    let mut request = PutObjectRequest::default();
    request.body = Some(file_data.into());

    Ok(client.put_object(request).sync()?)
}

This is probably acceptable for small files, but (I assume) this technique would break down as soon as you attempt to upload a file with a size greater than the available heap memory.

Another way to create a ByteStream is by using this initializer which accepts an object implementing the Stream trait. I would assume that File would implement this trait, but this does not appear to be the case.

My question(s):

Is there some type which can be constructed from a File which implements Stream? Is the correct solution to make my own tuple struct which wraps File and implements Stream itself, and is this implementation trivial? Is there another solution I'm not seeing, or am I simply misunderstanding how memory is allocated in the code above?

like image 713
dalton_c Avatar asked Dec 20 '19 16:12

dalton_c


1 Answers

Is there some type which can be constructed from a File which implements Stream?

No, unfortunately. Nothing built-in in std, futures or tokio can do this directly at the moment.

Due to the "detatched" nature of Stream's items, such an implementation would have to allocate a new owned buffer for every slice of incoming data and hand it over to the caller. That wouldn't be very efficient. Until the Rust language has generic associated type (GAT), which hopefully will be in next year, we then can satisfyingly address the problem. Check out this futures-rs ticket and Niko's async interview #2 for more detail.

That being said, there are use cases right now where a Stream facade on top of underlying IO is desirable and good enough.

Is the correct solution to make my own tuple struct which wraps File and implements Stream itself, and is this implementation trivial?

For futures-0.1 that the rusoto depends on, there are several ways to implement this:

  • implement Stream trait for a struct that wraps a Read
  • make use of futures utility functions such as futures::stream::poll_fn
  • tokio-codec-0.1 has an excellent FramedRead that has already implemented Stream

The third is surely the easiest:

use futures::stream::Stream;  // futures = "0.1.29"
use rusoto_core::{ByteStream, Region};  // rusoto_core = "0.42.0"
use rusoto_s3::{PutObjectOutput, PutObjectRequest, S3Client, S3};  // rusoto_s3 = "0.42.0"
use std::{error::Error, fs::File, path::Path};
use tokio_codec::{BytesCodec, FramedRead};  // tokio-codec = "0.1.1"
use tokio_io::io::AllowStdIo;  // tokio-io = "0.1.12"

fn upload_file(file_path: &Path) -> Result<PutObjectOutput, Box<dyn Error>> {
    let file = File::open(file_path)?;
    let aio = AllowStdIo::new(file);
    let stream = FramedRead::new(aio, BytesCodec::new()).map(|bs| bs.freeze());

    let client = S3Client::new(Region::UsEast1);
    let mut request = PutObjectRequest::default();
    request.body = Some(ByteStream::new(stream));

    Ok(client.put_object(request).sync()?)
}
like image 191
edwardw Avatar answered Nov 15 '22 08:11

edwardw