Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

http => akka stream => http

I'd like to use akka streams in order to pipe some json webservices together. I'd like to know the best approach to make a stream from an http request and stream chunks to another. Is there a way to define such a graph and run it instead of the code below? So far I tried to do it this way, not sure if it is actually really streaming yet:

override def receive: Receive = {
   case GetTestData(p, id) =>
     // Get the data and pipes it to itself through a message as recommended
     // https://doc.akka.io/docs/akka-http/current/client-side/request-level.html
     http.singleRequest(HttpRequest(uri = uri.format(p, id)))
       .pipeTo(self)

   case HttpResponse(StatusCodes.OK, _, entity, _) =>
     val initialRes = entity.dataBytes.via(JsonFraming.objectScanner(Int.MaxValue)).map(bStr => ChunkStreamPart(bStr.utf8String))

     // Forward the response to next job and pipes the request response to dedicated actor
     http.singleRequest(HttpRequest(
       method = HttpMethods.POST,
       uri = "googl.cm/flow",
       entity = HttpEntity.Chunked(ContentTypes.`application/json`, 
       initialRes)
     ))


   case resp @ HttpResponse(code, _, _, _) =>
     log.error("Request to test job failed, response code: " + code)
     // Discard the flow to avoid backpressure
     resp.discardEntityBytes()

   case _ => log.warning("Unexpected message in TestJobActor")
 }
like image 764
Bill'o Avatar asked Jan 29 '18 14:01

Bill'o


People also ask

What is Akka HTTP used for?

Akka HTTP is mostly used for making integration layers based on HTTP. A user can build the application based on their requirements, and Akka HTTP can be used to cater to the needs of HTTP integration.

What is an Akka stream?

Core concepts. Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness, and it is the defining feature of Akka Streams.

Does HTTP support streaming?

HTTP Streaming is a push-style data transfer technique that allows a web server to continuously send data to a client over a single HTTP connection that remains open indefinitely.

What is directive in Akka HTTP?

A “Directive” is a small building block used for creating arbitrarily complex route structures. Akka HTTP already pre-defines a large number of directives and you can easily construct your own: Basics. Structure.


1 Answers

This should be a graph equivalent to your receive:

Http()
.cachedHostConnectionPool[Unit](uri.format(p, id))
.collect {
  case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
    val initialRes = entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(bStr => ChunkStreamPart(bStr.utf8String))
    Some(initialRes)

  case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
    log.error("Request to test job failed, response code: " + code)
    // Discard the flow to avoid backpressure
    resp.discardEntityBytes()
    None
}
.collect {
  case Some(initialRes) => initialRes
}
.map { initialRes =>
  (HttpRequest(
     method = HttpMethods.POST,
     uri = "googl.cm/flow",
     entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
   ),
   ())
}
.via(Http().superPool[Unit]())

The type of this is Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool], where the Unit is a correlation ID you can use if you want to know which request corresponds to the response arrived, and HostConnectionPool materialized value can be used to shut down the connection to the host. Only cachedHostConnectionPool gives you back this materialized value, superPool probably handles this on its own (though I haven't checked). Anyway, I recommend you just use Http().shutdownAllConnectionPools() upon shutdown of your application unless you need otherwise for some reason. In my experience, it's much less error prone (e.g. forgetting the shutdown).

You can also use Graph DSL, to express the same graph:

val graph = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

    val host1Flow = b.add(Http().cachedHostConnectionPool[Unit](uri.format(p, id)))
    val host2Flow = b.add(Http().superPool[Unit]())

    val toInitialRes = b.add(
      Flow[(Try[HttpResponse], Unit)]
        .collect {
          case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), _) =>
            val initialRes = entity.dataBytes
              .via(JsonFraming.objectScanner(Int.MaxValue))
              .map(bStr => ChunkStreamPart(bStr.utf8String))
            Some(initialRes)

          case (Success(resp @ HttpResponse(code, _, _, _)), _) =>
            log.error("Request to test job failed, response code: " + code)
            // Discard the flow to avoid backpressure
            resp.discardEntityBytes()
            None
        }
    )

    val keepOkStatus = b.add(
      Flow[Option[Source[HttpEntity.ChunkStreamPart, Any]]]
        .collect {
          case Some(initialRes) => initialRes
        }
    )

    val toOtherHost = b.add(
      Flow[Source[HttpEntity.ChunkStreamPart, Any]]
        .map { initialRes =>
          (HttpRequest(
             method = HttpMethods.POST,
             uri = "googl.cm/flow",
             entity = HttpEntity.Chunked(ContentTypes.`application/json`, initialRes)
           ),
           ())
        }
    )

    host1Flow ~> toInitialRes ~> keepOkStatus ~> toOtherHost ~> host2Flow

    FlowShape(host1Flow.in, host2Flow.out)
})
like image 56
esgott Avatar answered Oct 04 '22 04:10

esgott