Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Composing BodyParser in Play 2.5

Given a function with this signature:

def parser[A](otherParser: BodyParser[A]): BodyParser[A]

How can I write the function in such a way that the request body is examined and verified before it is passed to otherParser?

For simplicity let's say that I want to verify that a header ("Some-Header", perhaps) has a value that matches the body exactly. So if I have this action:

def post(): Action(parser(parse.tolerantText)) { request =>
  Ok(request.body)
}

When I make a request like curl -H "Some-Header: hello" -d "hello" http://localhost:9000/post it should return "hello" in the response body with a status of 200. If my request is curl -H "Some-Header: hello" -d "hi" http://localhost:9000/post it should return a 400 with no body.

Here's what I've tried.

This one does not compile because otherParser(request).through(flow) expects flow to output a ByteString. The idea here was that the flow could notify the accumulator whether or not to continue processing via the Either output. I'm not sure how to let the accumulator know the status of the previous step.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, Either[Result, ByteString], NotUsed] = Flow[ByteString].map { bytes =>
    if (request.headers.get("Some-Header").contains(bytes.utf8String)) {
      Right(bytes)
    } else {
      Left(BadRequest)
    }
  }

  val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)

  // This fails to compile because flow needs to output a ByteString
  acc.through(flow)
}

I also attempted to use filter. This one does compile and the response body that gets written is correct. However it always returns a 200 Ok response status.

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, ByteString, akka.NotUsed] = Flow[ByteString].filter { bytes =>
    request.headers.get("Some-Header").contains(bytes.utf8String)
  }

  val acc: Accumulator[ByteString, Either[Result, A]] = otherParser(request)

  acc.through(flow)
}
like image 498
gregghz Avatar asked Oct 18 '22 17:10

gregghz


1 Answers

I came up with a solution using a GraphStageWithMaterializedValue. This concept was borrowed from Play's maxLength body parser. The key difference between my first attempt in my question (that doesn't compile) is that instead of attempting to mutate the stream I should use the materialized value to convey information about the state of processing. While I had created a Flow[ByteString, Either[Result, ByteString], NotUsed] it turns out what I needed was a Flow[ByteString, ByteString, Future[Boolean]].

So with that, my parser function ends up looking like this:

def parser[A](otherParser: BodyParser[A]): BodyParser[A] = BodyParser { request =>
  val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))

  val parserSink: Sink[ByteString, Future[Either[Result, A]]] = otherParser.apply(request).toSink

  Accumulator(flow.toMat(parserSink) { (statusFuture: Future[Boolean], resultFuture: Future[Either[Result, A]]) =>
    statusFuture.flatMap { success =>
      if (success) {
        resultFuture.map {
          case Left(result) => Left(result)
          case Right(a) => Right(a)
        }
      } else {
        Future.successful(Left(BadRequest))
      }
    }
  })
}

The key line is this one:

val flow: Flow[ByteString, ByteString, Future[Boolean]] = Flow.fromGraph(new BodyValidator(request.headers.get("Some-Header")))

The rest kind of falls into place once you are able to create this flow. Unfortunately BodyValidator is pretty verbose and feels somewhat boiler-platey. In any case, it's mostly pretty easy to read. GraphStageWithMaterializedValue expects you to implement def shape: S (S is FlowShape[ByteString, ByteString] here) to specify the input type and output type of this graph. It also expects you to imlpement def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) (M is a Future[Boolean] here) to define what the graph should actually do. Here's the full code of BodyValidator (I'll explain in more detail below):

class BodyValidator(expected: Option[String]) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Boolean]] {
  val in = Inlet[ByteString]("BodyValidator.in")
  val out = Outlet[ByteString]("BodyValidator.out")

  override def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Boolean]) = {
    val status = Promise[Boolean]()
    val bodyBuffer = new ByteStringBuilder()

    val logic = new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = pull(in)
      })

      setHandler(in, new InHandler {
        def onPush(): Unit = {
          val chunk = grab(in)
          bodyBuffer.append(chunk)
          push(out, chunk)
        }

        override def onUpstreamFinish(): Unit = {
          val fullBody = bodyBuffer.result()
          status.success(expected.map(ByteString(_)).contains(fullBody))
          completeStage()
        }

        override def onUpstreamFailure(e: Throwable): Unit = {
          status.failure(e)
          failStage(e)
        }
      })
    }

    (logic, status.future)
  }
}

You first want to create an Inlet and Outlet to set up the inputs and outputs for your graph

val in = Inlet[ByteString]("BodyValidator.in")
val out = Outlet[ByteString]("BodyValidator.out")

Then you use these to define shape.

def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

Inside createLogicAndMaterializedValue you need to initialize the value you intend to materialze. Here I've used a promise that can be resolved when I have the full data from the stream. I also create a ByteStringBuilder to track the data between iterations.

val status = Promise[Boolean]()
val bodyBuffer = new ByteStringBuilder()

Then I create a GraphStageLogic to actually set up what this graph does at each point of processing. Two handler are being set. One is an InHandler for dealing with data as it comes from the upstream source. The other is an OutHandler for dealing with data to send downstream. There's nothing really interesting in the OutHandler so I'll ignore it here besides to say that it is necessary boiler plate in order to avoid an IllegalStateException. Three methods are overridden in the InHandler: onPush, onUpstreamFinish, and onUpstreamFailure. onPush is called when new data is ready from upstream. In this method I simply grab the next chunk of data, write it to bodyBuffer and push the data downstream.

def onPush(): Unit = {
  val chunk = grab(in)
  bodyBuffer.append(chunk)
  push(out, chunk)
}

onUpstreamFinish is called when the upstream finishes (surprise). This is where the business logic of comparing the body with the header happens.

override def onUpstreamFinish(): Unit = {
  val fullBody = bodyBuffer.result()
  status.success(expected.map(ByteString(_)).contains(fullBody))
  completeStage()
}

onUpstreamFailure is implemented so that when something goes wrong, I can mark the materialized future as failed as well.

override def onUpstreamFailure(e: Throwable): Unit = {
  status.failure(e)
  failStage(e)
}

Then I just return the GraphStageLogic I've created and status.future as a tuple.

like image 158
gregghz Avatar answered Nov 13 '22 06:11

gregghz