Given a function with this signature:
def parser[A](otherParser: BodyParser[A]): BodyParser[A]
How can I write the function in such a way that the request body is examined and verified before it is passed to otherParser
?
For simplicity let's say that I want to verify that a header ("Some-Header", perhaps) has a value that matches the body exactly. So if I have this action:
def post(): Action(parser(parse.tolerantText)) { request =>
Ok(request.body)
}
When I make a request like curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post
it should return "hello" in the response body with a status of 200. If my request is curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post
it should return a 400 with no body.
Here's what I've tried.
This one does not compile because otherParser(request).through(flow)
expects flow
to output a ByteString
. The idea here was that the flow could notify the accumulator whether or not to continue processing via the Either
output. I'm not sure how to let the accumulator know the status of the previous step.
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes =>
if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
Right(bytes)
} else {
Left(BadRequest)
}
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
// This fails to compile because flow needs to output a ByteString
acc.through(flow)
}
I also attempted to use filter. This one does compile and the response body that gets written is correct. However it always returns a 200 Ok
response status.
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes =>
request.headers.get("Some-Header").contains(bytes.utf8String)
}
val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)
acc.through(flow)
}
I came up with a solution using a GraphStageWithMaterializedValue
. This concept was borrowed from Play's maxLength
body parser. The key difference between my first attempt in my question (that doesn't compile) is that instead of attempting to mutate the stream I should use the materialized value to convey information about the state of processing. While I had created a Flow[ByteString, Either[Result, ByteString], NotUsed]
it turns out what I needed was a Flow[ByteString, ByteString, Future[Boolean]]
.
So with that, my parser
function ends up looking like this:
def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink
Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
statusFuture.flatMap { success =>
if (success) {
resultFuture.map {
case Left(result) => Left(result)
case Right(a) => Right(a)
}
} else {
Future.successful(Left(BadRequest))
}
}
})
}
The key line is this one:
val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))
The rest kind of falls into place once you are able to create this flow. Unfortunately BodyValidator
is pretty verbose and feels somewhat boiler-platey. In any case, it's mostly pretty easy to read. GraphStageWithMaterializedValue
expects you to implement def shape: S
(S
is FlowShape[ByteString, ByteString]
here) to specify the input type and output type of this graph. It also expects you to imlpement def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M)
(M
is a Future[Boolean]
here) to define what the graph should actually do. Here's the full code of BodyValidator
(I'll explain in more detail below):
class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
})
}
(logic, status.future)
}
}
You first want to create an Inlet
and Outlet
to set up the inputs and outputs for your graph
val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")
Then you use these to define shape
.
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
Inside createLogicAndMaterializedValue
you need to initialize the value you intend to materialze. Here I've used a promise that can be resolved when I have the full data from the stream. I also create a ByteStringBuilder
to track the data between iterations.
val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()
Then I create a GraphStageLogic
to actually set up what this graph does at each point of processing. Two handler are being set. One is an InHandler
for dealing with data as it comes from the upstream source. The other is an OutHandler
for dealing with data to send downstream. There's nothing really interesting in the OutHandler
so I'll ignore it here besides to say that it is necessary boiler plate in order to avoid an IllegalStateException
. Three methods are overridden in the InHandler
: onPush
, onUpstreamFinish
, and onUpstreamFailure
. onPush
is called when new data is ready from upstream. In this method I simply grab the next chunk of data, write it to bodyBuffer
and push the data downstream.
def onPush(): Unit = {
val chunk = grab(in)
bodyBuffer.append(chunk)
push(out, chunk)
}
onUpstreamFinish
is called when the upstream finishes (surprise). This is where the business logic of comparing the body with the header happens.
override def onUpstreamFinish(): Unit = {
val fullBody = bodyBuffer.result()
status.success(expected.map(ByteString(_)).contains(fullBody))
completeStage()
}
onUpstreamFailure
is implemented so that when something goes wrong, I can mark the materialized future as failed as well.
override def onUpstreamFailure(e: Throwable): Unit = {
status.failure(e)
failStage(e)
}
Then I just return the GraphStageLogic
I've created and status.future
as a tuple.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With