Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Controlling the number of spawned futures to create backpressure

I am using a futures-rs powered version of the Rusoto AWS Kinesis library. I need to spawn a deep pipeline of AWS Kinesis requests to achieve high-throughput because Kinesis has a limit of 500 records per HTTP request. Combined with the 50ms latency of sending a request, I need to start generating many concurrent requests. I am looking to create somewhere on the order of 100 in-flight requests.

The Rusoto put_records function signature looks like this:

fn put_records(
    &self,
    input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>

The RusotoFuture is a wrapper defined like this:

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
    inner: Box<Future<Item = T, Error = E> + 'static>,
}

The inner Future is wrapped but the RusutoFuture still implements Future::poll(), so I believe it is compatible with the futures-rs ecosystem. The RusotoFuture provides a synchronization call:

impl<T, E> RusotoFuture<T, E> {
    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E> {
        self.wait()
    }
}

I can issue a request and sync() it, getting the result from AWS. I would like to create many requests, put them in some kind of queue/list, and gather finished requests. If the request errored I need to reissue the request (this is somewhat normal in Kinesis, especially when hitting limits on your shard throughput). If the request is completed successfully I should issue a request with new data. I could spawn a thread for each request and sync it but that seems inefficient when I have the async IO thread running.

I have tried using futures::sync::mpsc::channel from my application thread (not running from inside the Tokio reactor) but whenever I clone the tx it generates its own buffer, eliminating any kind of backpressure on send:

fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
    use futures::sync::mpsc::{ channel, spawn };
    use futures::{ Sink, Future, Stream };
    use futures::stream::Sender;
    use rusoto_core::reactor::DEFAULT_REACTOR;

    let client = Arc::new(KinesisClient::simple(Region::UsWest2));
    let data = FauxData::new(); // a data generator for testing

    let (mut tx, mut rx) = channel(1);

    for rec in data {
        tx.clone().send(rec);
    }
}

Without the clone, I have the error:

error[E0382]: use of moved value: `tx`
   --> src/main.rs:150:9
    |
150 |         tx.send(rec);
    |         ^^ value moved here in previous iteration of loop
    |
    = note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait

I have also look at futures::mpsc::sync::spawn based on recommendations but it takes owner ship of the rx (as a Stream) and does not solve my problem with the Copy of tx causing unbounded behavior.

I'm hoping if I can get the channel/spawn usage working, I will have a system which takes RusotoFutures, waits for them to complete, and then provides me an easy way to grab completion results from my application thread.

like image 710
xrl Avatar asked Jan 15 '18 16:01

xrl


1 Answers

As far as I can tell your problem with channel is not that a single clone of the Sender increase the capacity by one, it is that you clone the Sender for every item you're trying to send.

The error you're seeing without clone comes from your incorrect usage of the Sink::send interface. With clone you actually should see the warning:

warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled

That is: your current code doesn't actually ever send anything!

In order to apply backpressure you need to chain those send calls; each one should wait until the previous one finished (and you need to wait for the last one too!); on success you'll get the Sender back. The best way to do this is to generate a Stream from your iterator by using iter_ok and to pass it to send_all.

Now you got one future SendAll that you need to "drive". If you ignore the result and panic on error (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })) you could spawn it as a separate task, but maybe you want to integrate it into your main application (i.e. return it in a Box).

// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))

RusotoFuture::sync and Future::wait

Don't use Future::wait: it is already deprecated in a branch, and it usually won't do what you actually are looking for. I doubt RusotoFuture is aware of the problems, so I recommend avoiding RusotoFuture::sync.

Cloning Sender increases channel capacity

As you correctly stated cloning Sender increases the capacity by one.

This seems to be done to improve performance: A Sender starts in the unblocked ("unparked") state; if a Sender isn't blocked it can send an item without blocking. But if the number of items in the queue hits the configured limit when a Sender sends an item, the Sender becomes blocked ("parked"). (Removing items from the queue will unblock the Sender at a certain time.)

This means that after the inner queue hits the limit each Sender still can send one item, which leads to the documented effect of increased capacity, but only if actually all the Senders are sending items - unused Senders don't increase the observed capacity.

The performance boost comes from the fact that as long as you don't hit the limit it doesn't need to park and notify tasks (which is quite heavy).

The private documentation at the top of the mpsc module describes more of the details.

like image 148
Stefan Avatar answered Sep 28 '22 05:09

Stefan