Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka-http: send element to akka sink from http route

How can I send elements/messages to an Akka Sink from an Akka HTTP route? My HTTP route still needs to return a normal HTTP response.

I imagine this requires a stream branch/junction. The normal HTTP routes are flows from HttpRequest -> HttpResponse. I would like to add a branch/junction so that HttpRequests can trigger events to my separate sink as well as generate the normal HttpResponse.

Below is a very simple single route akka-http app. For simplicity, I'm using a simple println sink. My production use case, will obviously involve a less trivial sink.

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val route: akka.http.scaladsl.server.Route =
    path("hello" / Segment) { p =>
      get {
        // I'd like to send a message to an Akka Sink as well as return an HTTP response.
        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }

  val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
  val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

EDIT: Or in using the low-level akka-http API, how could I send specific messages to a sink from a specific route handler?

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

  val serverSource = Http().bind(interface = "localhost", port = 8080)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      println("Accepted new connection from " + connection.remoteAddress)

      connection handleWithSyncHandler requestHandler
      // this is equivalent to
      // connection handleWith { Flow[HttpRequest] map requestHandler }
    }).run()

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}
like image 550
clay Avatar asked Feb 24 '17 16:02

clay


2 Answers

IF you want to send the whole HttpRequest to a sink of yours, I'd say the simplest way is to use the alsoTo combinator. The result would be something along the lines of

val mySink: Sink[HttpRequest, NotUsed] = ???

val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

FYI: alsoTo in fact hides a Broadcast stage.

IF instead you need to selectively send a message to a Sink from a specific subroute, you have no other choice but to materialize a new flow for each incoming request. See example below

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
  path("hello" / Segment) { p =>
    get {

      (extract(_.request) & extractMaterializer) { (req, mat) ⇒
        Source.single(req).runWith(sink)(mat)

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

Also, keep in mind you can always ditch the high-level DSL completely, and model you whole route using the lower-level streams DSL. This will result in more verbose code - but will give you full control of your stream materialization.

EDIT: example below

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val handlerFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val partition = b.add(Partition[HttpRequest](2, {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
      case _                                        ⇒ 1
    }))
    val merge = b.add(Merge[HttpResponse](2))

    val happyPath = Flow[HttpRequest].map{ req ⇒
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))
    }        

    val unhappyPath = Flow[HttpRequest].map{
      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

    partition.out(0).alsoTo(sink) ~> happyPath   ~> merge
    partition.out(1)              ~> unhappyPath ~> merge

    FlowShape(partition.in, merge.out)
  })

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)
like image 166
Stefano Bonetti Avatar answered Nov 15 '22 07:11

Stefano Bonetti


This is the solution I used that seems ideal. Akka Http seems like it's designed so that your routes are simple HttpRequest->HttpResponse flows and don't involve any extra branches.

Rather than build everything into a single Akka stream graph, I have a separate QueueSource->Sink graph, and the normal Akka Http HttpRequest->HttpResponse flow just adds elements to the source queue as needed.

object HttpWithSinkTest {
  def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = {
    val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s")

    val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew)
    val sink: Sink[String, Future[Done]] = Sink.foreach(println)
    val annotatedSink = annotateMessage.toMat(sink)(Keep.right)
    val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both)

    queueGraph
  }

  def buildHttpFlow(queue: SourceQueueWithComplete[String],
                    actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
    implicit val actorSystemI = actorSystem
    implicit val materializerI = materializer

    val route: akka.http.scaladsl.server.Route =
      path("hello" / Segment) { p =>
        get {
          complete {
            queue.offer(s"got http event p=$p")

            s"<h1>Say hello to akka-http. p=$p</h1>"
          }
        }
      }

    val routeFlow = RouteResult.route2HandlerFlow(route)

    routeFlow
  }

  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("my-akka-http-test")
    val executor = actorSystem.dispatcher
    implicit val materializer = ActorMaterializer()(actorSystem)

    val (queue, _) = buildQueueSourceGraph().run()(materializer)

    val httpFlow = buildHttpFlow(queue, actorSystem, materializer)
    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
    val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080)

    println("Server online at http://localhost:8080/")
    println("Press RETURN to stop...")
    scala.io.StdIn.readLine()

    println("Shutting down...")

    val serverBinding = Await.result(bindingFuture, Duration.Inf)
    Await.result(serverBinding.unbind(), Duration.Inf)
    Await.result(actorSystem.terminate(), Duration.Inf)

    println("Done. Exiting")
  }
}
like image 28
clay Avatar answered Nov 15 '22 05:11

clay