I am using a futures-rs powered version of the Rusoto AWS Kinesis library. I need to spawn a deep pipeline of AWS Kinesis requests to achieve high-throughput because Kinesis has a limit of 500 records per HTTP request. Combined with the 50ms latency of sending a request, I need to start generating many concurrent requests. I am looking to create somewhere on the order of 100 in-flight requests.
The Rusoto put_records
function signature looks like this:
fn put_records(
&self,
input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>
The RusotoFuture
is a wrapper defined like this:
/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
inner: Box<Future<Item = T, Error = E> + 'static>,
}
The inner Future
is wrapped but the RusutoFuture
still implements Future::poll()
, so I believe it is compatible with the futures-rs
ecosystem. The RusotoFuture
provides a synchronization call:
impl<T, E> RusotoFuture<T, E> {
/// Blocks the current thread until the future has resolved.
///
/// This is meant to provide a simple way for non-async consumers
/// to work with rusoto.
pub fn sync(self) -> Result<T, E> {
self.wait()
}
}
I can issue a request and sync()
it, getting the result from AWS. I would like to create many requests, put them in some kind of queue/list, and gather finished requests. If the request errored I need to reissue the request (this is somewhat normal in Kinesis, especially when hitting limits on your shard throughput). If the request is completed successfully I should issue a request with new data. I could spawn a thread for each request and sync it but that seems inefficient when I have the async IO thread running.
I have tried using futures::sync::mpsc::channel
from my application thread (not running from inside the Tokio reactor) but whenever I clone the tx
it generates its own buffer, eliminating any kind of backpressure on send
:
fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
use futures::sync::mpsc::{ channel, spawn };
use futures::{ Sink, Future, Stream };
use futures::stream::Sender;
use rusoto_core::reactor::DEFAULT_REACTOR;
let client = Arc::new(KinesisClient::simple(Region::UsWest2));
let data = FauxData::new(); // a data generator for testing
let (mut tx, mut rx) = channel(1);
for rec in data {
tx.clone().send(rec);
}
}
Without the clone, I have the error:
error[E0382]: use of moved value: `tx`
--> src/main.rs:150:9
|
150 | tx.send(rec);
| ^^ value moved here in previous iteration of loop
|
= note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait
I have also look at futures::mpsc::sync::spawn
based on recommendations but it takes owner ship of the rx
(as a Stream
) and does not solve my problem with the Copy
of tx
causing unbounded behavior.
I'm hoping if I can get the channel
/spawn
usage working, I will have a system which takes RusotoFuture
s, waits for them to complete, and then provides me an easy way to grab completion results from my application thread.
As far as I can tell your problem with channel
is not that a single clone of the Sender
increase the capacity by one, it is that you clone the Sender
for every item you're trying to send.
The error you're seeing without clone
comes from your incorrect usage of the Sink::send
interface. With clone
you actually should see the warning:
warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled
That is: your current code doesn't actually ever send anything!
In order to apply backpressure you need to chain those send
calls; each one should wait until the previous one finished (and you need to wait for the last one too!); on success you'll get the Sender
back. The best way to do this is to generate a Stream
from your iterator by using iter_ok
and to pass it to send_all
.
Now you got one future SendAll
that you need to "drive". If you ignore the result and panic on error (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })
) you could spawn it as a separate task, but maybe you want to integrate it into your main application (i.e. return it in a Box
).
// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))
RusotoFuture::sync
and Future::wait
Don't use Future::wait
: it is already deprecated in a branch, and it usually won't do what you actually are looking for. I doubt RusotoFuture
is aware of the problems, so I recommend avoiding RusotoFuture::sync
.
Sender
increases channel capacityAs you correctly stated cloning Sender
increases the capacity by one.
This seems to be done to improve performance: A Sender
starts in the unblocked ("unparked") state; if a Sender
isn't blocked it can send an item without blocking. But if the number of items in the queue hits the configured limit when a Sender
sends an item, the Sender
becomes blocked ("parked"). (Removing items from the queue will unblock the Sender
at a certain time.)
This means that after the inner queue hits the limit each Sender
still can send one item, which leads to the documented effect of increased capacity, but only if actually all the Sender
s are sending items - unused Sender
s don't increase the observed capacity.
The performance boost comes from the fact that as long as you don't hit the limit it doesn't need to park and notify tasks (which is quite heavy).
The private documentation at the top of the mpsc
module describes more of the details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With