Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a stream of futures for a blocking call using futures.rs and Redis PubSub?

I'm trying to create a system by which my application can receive streaming data from a Redis PubSub channel and process it. The Redis driver that I'm using, along with all other Redis drivers for Rust that I've seen, use a blocking operation to get data from the channel that only returns a value when it receives data:

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

I wanted to use the futures-rs library to wrap this blocking function call in a future so that I can perform other tasks within my application while waiting for input.

I read the tutorial for futures and tried to create a Stream that would signal when there data is received by the PubSub, but I can't figure out how to do so.

How can I create schedule and poll functions for the blocking pubsub.get_message() function?

like image 831
Ameo Avatar asked Aug 11 '16 21:08

Ameo


1 Answers

Heavy caveat I've never used this library before, and my low-level knowledge of some of the concepts is a bit... lacking. Mostly I'm reading through the tutorial. I'm pretty sure anyone who has done async work will read this and laugh, but it may be a useful starting point for other people. Caveat emptor!


Let's start with something a bit simpler, demonstrating how a Stream works. We can convert an iterator of Results into a stream:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

This shows us one way to consume the stream. We use and_then to do something to each payload (here just printing it out) and then for_each to convert the Stream back into a Future. We can then run the future by calling the strangely named forget method.


Next is to tie the Redis library into the mix, handling just one message. Since the get_message() method is blocking, we need to introduce some threads into the mix. It's not a good idea to perform large amount of work in this type of asynchronous system as everything else will be blocked. For example:

Unless it is otherwise arranged to be so, it should be ensured that implementations of this function finish very quickly.

In an ideal world, the redis crate would be built atop a library like futures and expose all this natively.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

My understanding gets fuzzier here. In a separate thread, we block for the message and push it into the channel when we get it. What I don't understand is why we need to hold onto the thread's handle. I would expect that foo.forget would be blocking itself, waiting until the stream is empty.

In a telnet connection to the Redis server, send this:

publish rust awesome

And you will see it works. Adding print statements shows that the (for me) the foo.forget statement is run before the thread is spawned.


Multiple messages is trickier. The Sender consumes itself to prevent the generating side from getting too far ahead of the consuming side. This is accomplished by returning another future from send! We need to shuttle it back out of there to reuse it for the next iteration of the loop:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

I'm sure that there will be more ecosystem for this type of interoperation as time goes on. For example, the futures-cpupool crate could probably be extended to support a similar usecase to this.

like image 60
Shepmaster Avatar answered Dec 02 '22 21:12

Shepmaster