Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rusoto streamed upload using sigv4

I'm having trouble streaming uploads to S3:

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),

        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

I generate a binary file by using dd if=/dev/zero of=/tmp/test.bin bs=4k count=500.

Not withstanding that I haven't quite wrapped my head around the future stuff yet, I'm just trying to get something dumping a file into S3, with the minimum amount of memory usage possible.

On run, I get the following output with debug logging; potentially sensitive information ellipsed:

$ RUST_LOG=debug cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.36s
     Running `target/debug/uploader`
 DEBUG uploader > copy_to_s3: "/tmp/test.bin"
 DEBUG uploader > filename: "test.bin"
 DEBUG rusoto_core::request > Full request:
 method: PUT
 final_uri: https://s3.eu-west-2.amazonaws.com/.../test.bin
Headers:

 DEBUG rusoto_core::request > authorization:"AWS4-HMAC-SHA256 Credential=.../20200408/eu-west-2/s3/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-server-side-encryption, Signature=..."
 DEBUG rusoto_core::request > content-type:"application/octet-stream"
 DEBUG rusoto_core::request > host:"s3.eu-west-2.amazonaws.com"
 DEBUG rusoto_core::request > x-amz-content-sha256:"UNSIGNED-PAYLOAD"
 DEBUG rusoto_core::request > x-amz-date:"20200408T173930Z"
 DEBUG rusoto_core::request > x-amz-security-token:"..."
 DEBUG rusoto_core::request > x-amz-server-side-encryption:"AES256"
 DEBUG rusoto_core::request > user-agent:"rusoto/0.43.0 rust/1.42.0 macos"
 DEBUG hyper::client::connect::dns > resolving host="s3.eu-west-2.amazonaws.com"
 DEBUG hyper::client::connect::http > connecting to 52.95.148.48:443
 DEBUG hyper::client::connect::http > connected to 52.95.148.48:443
 DEBUG hyper::proto::h1::io         > flushed 1070 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 8200 bytes
 DEBUG hyper::proto::h1::io         > flushed 147600 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 418200 bytes
 DEBUG hyper::proto::h1::io         > flushed 16405 bytes
 DEBUG hyper::proto::h1::io         > read 291 bytes
 DEBUG hyper::proto::h1::io         > parsed 7 headers
 DEBUG hyper::proto::h1::conn       > incoming body is chunked encoding
 DEBUG hyper::proto::h1::io         > read 345 bytes
 DEBUG hyper::proto::h1::decode     > incoming chunked header: 0x14D (333 bytes)
 DEBUG hyper::proto::h1::conn       > incoming body completed
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "Header" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "RequestId" in error response.
 DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "HostId" in error response.
 ERROR uploader                       > Failure: Unknown(BufferedHttpResponse {status: 501, body: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>3F1A03D67D81CCAB</RequestId><HostId>...=</HostId></Error>", headers: {"x-amz-request-id": "3F1A03D67D81CCAB", "x-amz-id-2": "...", "content-type": "application/xml", "transfer-encoding": "chunked", "date": "Wed, 08 Apr 2020 17:39:30 GMT", "connection": "close", "server": "AmazonS3"} })
 DEBUG uploader                       > DONE: copy_to_s3: "/tmp/test.bin"

I think this is telling me that it's not a sigv4 signed upload, but I'm not sure.

For the most part, the debug output looks like it's successfully sending the file in chunks, but then it errors...

Given my assumption about it being sent sigv2 and not sigv4, how do I go about making it send the sigv4 headers instead? Failing that, what have I missed?

like image 348
pms1969 Avatar asked Apr 08 '20 15:04

pms1969


1 Answers

Content-Length should be specified.

Changed section

let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

Full text of the fixed example

// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"

#![allow(dead_code)]
#[allow(unused_imports)]

use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};

#[tokio::main]
async fn main() {
    pretty_env_logger::init();
    let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
    copy_to_s3(&pathbuf).await;
}

async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
    debug!("copy_to_s3: {:?}", pathbuf);

    let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;

    let filename = pathbuf.file_name().unwrap().to_str().unwrap();
    debug!("filename: {:?}", filename);
    let s3_client = S3Client::new(Region::EuWest2);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("..."),
        key: filename.to_owned(),
        server_side_encryption: Some("AES256".to_string()),
        // Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
        content_length: Some(2_048_000),
        body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
        ..Default::default()
    }).await;

    match result {
        Ok(success) => { 
            debug!("Success: {:?}", success);
        },
        Err(error) => {
            error!("Failure: {:?}", error);
        }
    }

    debug!("DONE: copy_to_s3: {:?}", pathbuf);
}

fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
    R: AsyncRead,
{
    codec::FramedRead::new(r, codec::BytesCodec::new())
        .map_ok(|bytes| bytes.freeze())
}

The code works as intended, however, you have to know the length of the file beforehand.

like image 56
Paul Brit Avatar answered Oct 21 '22 07:10

Paul Brit