I am writing a server in netty, in which I need to make a call to memcached. I am using spymemcached and can easily do the synchronous memcached call. I would like this memcached call to be async. Is that possible? The examples provided with netty do not seem to be helpful.
I tried using callbacks: created a ExecutorService
pool in my Handler and submitted a callback worker to this pool. Like this:
public class MyHandler extends ChannelInboundMessageHandlerAdapter<MyPOJO> implements CallbackInterface{
... private static ExecutorService pool = Executors.newFixedThreadPool(20); @Override public void messageReceived(ChannelHandlerContext ctx, MyPOJO pojo) { ... CallingbackWorker worker = new CallingbackWorker(key, this); pool.submit(worker); ... } public void myCallback() { //get response this.ctx.nextOutboundMessageBuf().add(response); }
}
CallingbackWorker
looks like:
public class CallingbackWorker implements Callable {
public CallingbackWorker(String key, CallbackInterface c) { this.c = c; this.key = key; } public Object call() { //get value from key c.myCallback(value); }
However, when I do this, this.ctx.nextOutboundMessageBuf()
in myCallback
gets stuck.
So, overall, my question is: how to do async memcached calls in Netty?
There are two problems here: a small-ish issue with the way you're trying to code this, and a bigger one with many libraries that provide async service calls, but no good way to take full advantage of them in an async framework like Netty. That forces users into suboptimal hacks like this one, or a less-bad, but still not ideal approach I'll get to in a moment.
First the coding problem. The issue is that you're trying to call a ChannelHandlerContext method from a thread other than the one associated with your handler, which is not allowed. That's pretty easy to fix, as shown below. You could code it a few other ways, but this is probably the most straightforward:
private static ExecutorService pool = Executors.newFixedThreadPool(20);
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//...
final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());
// first wait for the response on a pool thread
pool.execute(new Runnable() {
public void run() {
String value;
Exception err;
try {
value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want
err = null;
} catch (Exception e) {
err = e;
value = null;
}
// put results into final variables; compiler won't let us do it directly above
final fValue = value;
final fErr = err;
// now process the result on the ChannelHandler's thread
ctx.executor().execute(new Runnable() {
public void run() {
handleResult(fValue, fErr);
}
});
}
});
// note that we drop through to here right after calling pool.execute() and
// return, freeing up the handler thread while we wait on the pool thread.
}
private void handleResult(String value, Exception err) {
// handle it
}
That will work, and might be sufficient for your application. But you've got a fixed-sized thread pool, so if you're ever going to handle much more than 20 concurrent connections, that will become a bottleneck. You could increase the pool size, or use an unbounded one, but at that point, you might as well be running under Tomcat, as memory consumption and context-switching overhead start to become issues, and you lose the scalabilty that was the attraction of Netty in the first place!
And the thing is, Spymemcached is NIO-based, event-driven, and uses just one thread for all its work, yet provides no way to fully take advantage of its event-driven nature. I expect they'll fix that before too long, just as Netty 4 and Cassandra have recently by providing callback (listener) methods on Future objects.
Meanwhile, being in the same boat as you, I researched the alternatives, and not being too happy with what I found, I wrote (yesterday) a Future tracker class that can poll up to thousands of Futures at a configurable rate, and call you back on the thread (Executor) of your choice when they complete. It uses just one thread to do this. I've put it up on GitHub if you'd like to try it out, but be warned that it's still wet, as they say. I've tested it a lot in the past day, and even with 10000 concurrent mock Future objects, polling once a millisecond, its CPU utilization is negligible, though it starts to go up beyond 10000. Using it, the example above looks like this:
// in some globally-accessible class:
public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS);
// in a handler class:
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
// ...
final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder());
// add a listener for the Future, with a timeout in 2 seconds, and pass
// the Executor for the current context so the callback will run
// on the same thread.
Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(),
new ForeignFutureListener<String,GetFuture<String>>() {
public void operationSuccess(String value) {
// do something ...
ctx.fireChannelRead(someval);
}
public void operationTimeout(GetFuture<String> f) {
// do something ...
}
public void operationFailure(Exception e) {
// do something ...
}
});
}
You don't want more than one or two FFT instances active at any time, or they could become a drain on CPU. But a single instance can handle thousands of outstanding Futures; about the only reason to have a second one would be to handle higher-latency calls, like S3, at a slower polling rate, say 10-20 milliseconds.
One drawback of the polling approach is that it adds a small amount of latency. For example, polling once a millisecond, on average it will add 500 microseconds to the response time. That won't be an issue for most applications, and I think is more than offset by the memory and CPU savings over the thread pool approach.
I expect within a year or so this will be a non-issue, as more async clients provide callback mechanisms, letting you fully leverage NIO and the event-driven model.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With