I'm having trouble streaming uploads to S3:
// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"
#![allow(dead_code)]
#[allow(unused_imports)]
use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
copy_to_s3(&pathbuf).await;
}
async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
debug!("copy_to_s3: {:?}", pathbuf);
let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;
let filename = pathbuf.file_name().unwrap().to_str().unwrap();
debug!("filename: {:?}", filename);
let s3_client = S3Client::new(Region::EuWest2);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("..."),
key: filename.to_owned(),
server_side_encryption: Some("AES256".to_string()),
body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
..Default::default()
}).await;
match result {
Ok(success) => {
debug!("Success: {:?}", success);
},
Err(error) => {
error!("Failure: {:?}", error);
}
}
debug!("DONE: copy_to_s3: {:?}", pathbuf);
}
fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
R: AsyncRead,
{
codec::FramedRead::new(r, codec::BytesCodec::new())
.map_ok(|bytes| bytes.freeze())
}
I generate a binary file by using dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
.
Not withstanding that I haven't quite wrapped my head around the future stuff yet, I'm just trying to get something dumping a file into S3, with the minimum amount of memory usage possible.
On run, I get the following output with debug logging; potentially sensitive information ellipsed:
$ RUST_LOG=debug cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.36s
Running `target/debug/uploader`
DEBUG uploader > copy_to_s3: "/tmp/test.bin"
DEBUG uploader > filename: "test.bin"
DEBUG rusoto_core::request > Full request:
method: PUT
final_uri: https://s3.eu-west-2.amazonaws.com/.../test.bin
Headers:
DEBUG rusoto_core::request > authorization:"AWS4-HMAC-SHA256 Credential=.../20200408/eu-west-2/s3/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-server-side-encryption, Signature=..."
DEBUG rusoto_core::request > content-type:"application/octet-stream"
DEBUG rusoto_core::request > host:"s3.eu-west-2.amazonaws.com"
DEBUG rusoto_core::request > x-amz-content-sha256:"UNSIGNED-PAYLOAD"
DEBUG rusoto_core::request > x-amz-date:"20200408T173930Z"
DEBUG rusoto_core::request > x-amz-security-token:"..."
DEBUG rusoto_core::request > x-amz-server-side-encryption:"AES256"
DEBUG rusoto_core::request > user-agent:"rusoto/0.43.0 rust/1.42.0 macos"
DEBUG hyper::client::connect::dns > resolving host="s3.eu-west-2.amazonaws.com"
DEBUG hyper::client::connect::http > connecting to 52.95.148.48:443
DEBUG hyper::client::connect::http > connected to 52.95.148.48:443
DEBUG hyper::proto::h1::io > flushed 1070 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 8200 bytes
DEBUG hyper::proto::h1::io > flushed 147600 bytes
DEBUG hyper::proto::h1::io > flushed 418200 bytes
DEBUG hyper::proto::h1::io > flushed 418200 bytes
DEBUG hyper::proto::h1::io > flushed 418200 bytes
DEBUG hyper::proto::h1::io > flushed 418200 bytes
DEBUG hyper::proto::h1::io > flushed 16405 bytes
DEBUG hyper::proto::h1::io > read 291 bytes
DEBUG hyper::proto::h1::io > parsed 7 headers
DEBUG hyper::proto::h1::conn > incoming body is chunked encoding
DEBUG hyper::proto::h1::io > read 345 bytes
DEBUG hyper::proto::h1::decode > incoming chunked header: 0x14D (333 bytes)
DEBUG hyper::proto::h1::conn > incoming body completed
DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "Header" in error response.
DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "RequestId" in error response.
DEBUG rusoto_core::proto::xml::error > Ignoring unknown XML element "HostId" in error response.
ERROR uploader > Failure: Unknown(BufferedHttpResponse {status: 501, body: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>3F1A03D67D81CCAB</RequestId><HostId>...=</HostId></Error>", headers: {"x-amz-request-id": "3F1A03D67D81CCAB", "x-amz-id-2": "...", "content-type": "application/xml", "transfer-encoding": "chunked", "date": "Wed, 08 Apr 2020 17:39:30 GMT", "connection": "close", "server": "AmazonS3"} })
DEBUG uploader > DONE: copy_to_s3: "/tmp/test.bin"
I think this is telling me that it's not a sigv4 signed upload, but I'm not sure.
For the most part, the debug output looks like it's successfully sending the file in chunks, but then it errors...
Given my assumption about it being sent sigv2 and not sigv4, how do I go about making it send the sigv4 headers instead? Failing that, what have I missed?
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("..."),
key: filename.to_owned(),
server_side_encryption: Some("AES256".to_string()),
// Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
content_length: Some(2_048_000),
body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
..Default::default()
}).await;
// rust version 1.42.0
// OS macos
// [dependencies]
// rusoto_core = "0.43.0"
// rusoto_s3 = "0.43.0"
// log = "0.4"
// pretty_env_logger = "0.4.0"
// tokio = "0.2.14"
// tokio-util = { version = "0.3.1", features = ["codec"] }
// futures = "0.3.4"
// bytes = "0.5.4"
#![allow(dead_code)]
#[allow(unused_imports)]
use log::{debug,info,warn,error};
use bytes::Bytes;
use tokio_util::codec;
use futures::stream::{Stream, TryStreamExt};
use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client};
use tokio::io::{AsyncRead, Result};
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let pathbuf = std::path::PathBuf::from(String::from("/tmp/test.bin"));
copy_to_s3(&pathbuf).await;
}
async fn copy_to_s3(pathbuf: &std::path::PathBuf) {
debug!("copy_to_s3: {:?}", pathbuf);
let tokio_file = tokio::fs::File::open(pathbuf.as_path()).await;
let filename = pathbuf.file_name().unwrap().to_str().unwrap();
debug!("filename: {:?}", filename);
let s3_client = S3Client::new(Region::EuWest2);
let result = s3_client.put_object(PutObjectRequest {
bucket: String::from("..."),
key: filename.to_owned(),
server_side_encryption: Some("AES256".to_string()),
// Based on dd if=/dev/zero of=/tmp/test.bin bs=4k count=500
content_length: Some(2_048_000),
body: Some(rusoto_core::ByteStream::new(into_bytes_stream(tokio_file.unwrap()))),
..Default::default()
}).await;
match result {
Ok(success) => {
debug!("Success: {:?}", success);
},
Err(error) => {
error!("Failure: {:?}", error);
}
}
debug!("DONE: copy_to_s3: {:?}", pathbuf);
}
fn into_bytes_stream<R>(r: R) -> impl Stream<Item=Result<Bytes>>
where
R: AsyncRead,
{
codec::FramedRead::new(r, codec::BytesCodec::new())
.map_ok(|bytes| bytes.freeze())
}
The code works as intended, however, you have to know the length of the file beforehand.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With