I'm building a REST API that starts some calculation in a Spark cluster and responds with a chunked stream of the results. Given the Spark stream with calculation results, I can use
dstream.foreachRDD()
to send the data out of Spark. I'm sending the chunked HTTP response with akka-http:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}
For simplicity, I'm trying to get plain text working first, will add JSON marshalling later.
But what is the idiomatic way of using the Spark DStream as a Source for the Akka stream? I figured I should be able to do it via a socket but since the Spark driver and the REST endpoint are sitting on the same JVM opening a socket just for this seems a bit of an overkill.
Not sure about the version of api at the time of the question. But now, with akka-stream 2.0.3, I believe you can do it like:
val source = Source
.actorRef[T](/* buffer size */ 100, OverflowStrategy.dropHead)
.mapMaterializedValue[Unit] { actorRef =>
dstream.foreach(actorRef ! _)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With