Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using scala continuations with netty/NIO listeners

I'm using the Netty library (version 4 from GitHub). It works great in Scala, but I am hoping for my library to be able to use continuation passing style for the asynchronous waiting.

Traditionally with Netty you would do something like this (an example asynchronous connect operation):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

If you are implementing a library (which I am) then you basically have three simple options to allow the user of the library to do stuff after the connection is made:

  1. Just return the ChannelFuture from your connect method and let the user deal with it - this doesn't provide much abstraction from netty.
  2. Take a ChannelFutureListener as a parameter of your connect method and add it as a listener to the ChannelFuture.
  3. Take a callback function object as a parameter of your connect method and call that from within the ChannelFutureListener that you create (this would make for a callback-driven style somewhat like node.js)

What I am trying to do is a fourth option; I didn't include it in the count above because it is not simple.

I want to use scala delimited continuations to make the use of the library be somewhat like a blocking library, but it will be nonblocking behind the scenes:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

Imagine other read/write operations being implemented in the same fashion. The goal of this being that the user's code can look more like this:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

In other words, the program will look like a simple blocking-style program but behind the scenes there won't be any blocking or threading.

The trouble I'm running into is that I don't fully understand how the typing of delimited continuations work. When I try to implement it in the above way, the compiler complains that my operationComplete implementation actually returns Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] instead of Unit. I get that there is sort of a "gotcha" in scala's CPS in that you must annotate a shift method's return type with @suspendable, which gets passed up the call stack until the reset, but there doesn't seem to be any way to reconcile that with a pre-existing Java library that has no concept of delimited continuations.

I feel like there really must be a way around this - if Swarm can serialize continuations and jam them over the network to be computed elsewhere, then it must be possible to simply call a continuation from a pre-existing Java class. But I can't figure out how it can be done. Would I have to rewrite entire parts of netty in Scala in order to make this happen?

like image 597
Jeremy Avatar asked Feb 07 '12 23:02

Jeremy


1 Answers

I found this explanation of Scala's continuations extremely helpful when I started out. In particular pay attention to the parts where he explains shift[A, B, C] and reset[B, C]. Adding a dummy null as the last statement of operationComplete should help.

Btw, you need to invoke retrn() inside another reset if it may have a shift nested inside it.

Edit: Here is a working example

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

with a possible output:

Outside reset
operationComplete starts
This will happen after the connection is finished
like image 143
shams Avatar answered Sep 23 '22 06:09

shams