I'm trying to create a system by which my application can receive streaming data from a Redis PubSub channel and process it. The Redis driver that I'm using, along with all other Redis drivers for Rust that I've seen, use a blocking operation to get data from the channel that only returns a value when it receives data:
let msg = match pubsub.get_message() {
Ok(m) => m,
Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
Ok(s) => s,
Err(_) => panic!("Could not convert redis message to string!")
};
I wanted to use the futures-rs library to wrap this blocking function call in a future so that I can perform other tasks within my application while waiting for input.
I read the tutorial for futures and tried to create a Stream
that would signal when there data is received by the PubSub, but I can't figure out how to do so.
How can I create schedule
and poll
functions for the blocking pubsub.get_message()
function?
Heavy caveat I've never used this library before, and my low-level knowledge of some of the concepts is a bit... lacking. Mostly I'm reading through the tutorial. I'm pretty sure anyone who has done async work will read this and laugh, but it may be a useful starting point for other people. Caveat emptor!
Let's start with something a bit simpler, demonstrating how a Stream
works. We can convert an iterator of Result
s into a stream:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
This shows us one way to consume the stream. We use and_then
to do something to each payload (here just printing it out) and then for_each
to convert the Stream
back into a Future
. We can then run the future by calling the strangely named forget
method.
Next is to tie the Redis library into the mix, handling just one message. Since the get_message()
method is blocking, we need to introduce some threads into the mix. It's not a good idea to perform large amount of work in this type of asynchronous system as everything else will be blocked. For example:
Unless it is otherwise arranged to be so, it should be ensured that implementations of this function finish very quickly.
In an ideal world, the redis crate would be built atop a library like futures and expose all this natively.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
My understanding gets fuzzier here. In a separate thread, we block for the message and push it into the channel when we get it. What I don't understand is why we need to hold onto the thread's handle. I would expect that foo.forget
would be blocking itself, waiting until the stream is empty.
In a telnet connection to the Redis server, send this:
publish rust awesome
And you will see it works. Adding print statements shows that the (for me) the foo.forget
statement is run before the thread is spawned.
Multiple messages is trickier. The Sender
consumes itself to prevent the generating side from getting too far ahead of the consuming side. This is accomplished by returning another future from send
! We need to shuttle it back out of there to reuse it for the next iteration of the loop:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
I'm sure that there will be more ecosystem for this type of interoperation as time goes on. For example, the futures-cpupool crate could probably be extended to support a similar usecase to this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With