Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Streamed upload to s3 with rusoto

Tags:

rust

rusoto

How can I upload file to s3 using rusoto, without reading file content to memory (streamed)?


With this code:

use std::fs::File;
use std::io::BufReader;

use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};

fn main() {
    let file = File::open("input.txt").unwrap();
    let mut reader = BufReader::new(file);

    let s3_client = S3Client::new(Region::UsEast1);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("example_bucket"),
        key: "example_filename".to_string(),
//        this works:
//      body: Some("example string".to_owned().into_bytes().into()),
//        this doesn't:
        body: Some(StreamingBody::new(reader)),
        ..Default::default()
    }).sync().expect("could not upload");
}

I receive the following error:

error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied
  --> src/bin/example.rs:18:20
   |
18 |         body: Some(StreamingBody::new(reader)),
   |                    ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>`
   |
   = note: required by `rusoto_core::stream::ByteStream::new`
like image 612
diralik Avatar asked Sep 05 '19 17:09

diralik


2 Answers

Okay. Strap yourself in, this is a fun one.

StreamingBody is an alias for ByteStream, which itself takes a parameter type S: Stream<Item = Bytes, Error = Error> + Send + 'static. In short, it needs to be a stream of bytes.

BufReader, evidently, does not implement this trait, as it predates futures and streams by a long while. There is also no easy conversion to Stream<Item = Bytes> that you can use to implicitly convert into this.

The reason the first (commented) example works is because String::into_bytes().into() will follow the typecast chain: String -> Vec<u8> -> ByteStream thanks to the implementation of From<Vec<u8>> on ByteStream.

Now that we know why this doesn't work, we can fix it. There is a fast way, and then there is a right way. I'll show you both.

The fast way

The fast (but not optimal) way is simply to call File::read_to_end(). This will fill up a Vec<u8>, which you can then use like you did before:

 let mut buf:Vec<u8> = vec![];
 file.read_to_end(&mut buf)?;
 // buf now contains the entire file

This is inefficient and suboptimal for two reasons:

  • read_to_end() is a blocking call. Based on where you are reading the file from, this blocking time may prove unreasonable
  • You are required to have more free RAM than you have bytes in your file (+ either 64 or 128 bits for the Vec definition + some extra we don't really care about)

The good way

The good way turns your file into a structure implementing AsyncRead. From this, we can then form a Stream.

Since you already have a std::fs::File, we will first convert it into a tokio::fs::File. This implements AsyncRead, which is very important for later:

let tokio_file = tokio::fs::File::from_std(file);

From this, we sadly need to do some pipework to get it into a Stream. Multiple crates have implemented it; the way to do so from scratch is the following:

use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
   .map(|r| r.as_ref().to_vec());

byte_stream is an instance of tokio_util::codec::FramedRead which implements Stream with a specific item based on our decoder. As our decoder is BytesCodec, your stream is therefore Stream<Item = BytesMut>.

As the playground doesn't know rusoto_core, I cannot show you the full flow. I can, however, show you that you can generate a Stream<Item = Vec<u8>, Error = io::Error>, which is the crux of this: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38e4ae8be0d70abd134b5331d6bf4133

like image 101
Sébastien Renauld Avatar answered Oct 06 '22 20:10

Sébastien Renauld


Here's a version with the upcoming Rusoto async-await syntax (for getObject although should be straightforward to tweak for upload)... possibly out for public consumption in Rusoto 0.4.3:

https://github.com/brainstorm/rusoto-s3-async-await

Namely:

pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
    let get_req = GetObjectRequest {
        bucket,
        key: object,
        ..Default::default()
    };

    let result = client
        .get_object(get_req)
        .await
        .expect("Couldn't GET object");
    println!("get object result: {:#?}", result);

    let stream = result.body.unwrap();
    let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();

    assert!(body.len() > 0);
    dbg!(body);
}

Which is essentially borrowed from the integration testsuite itself, where you can find snippets of the upload version too.

like image 23
brainstorm Avatar answered Oct 06 '22 21:10

brainstorm