Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scalaz-stream: how to handle the "header" (first chunks) in a different way to the rest?

Context: I'm trying to write a Process1[ByteVector, spray.http.HttpResponsePart] with output ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd. I haven't yet fully wrapped my head around scalaz-stream and its vocabulary.

How to write a Process that can handle first n chunks differently?

I've come up with this (strings as an example):

val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _))

val headerChunkAndRest: Process1[String, String] =
  headerChunk.take(1) ++ process1.id

io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt")))
  .pipe(headerChunkAndRest)
  .to(io.stdOutLines)
  .run.run

What is an idiomatic and, possibly, a generally composable way to write headerChunkAndRest?

like image 716
Vasiliy Levykin Avatar asked Mar 03 '15 11:03

Vasiliy Levykin


Video Answer


1 Answers

General Considerations

There are several ways to do this, strongly depending on the details of your needs. You can use the following helper methods that are part of scalaz-streams:

  1. foldWithIndex This gives you the current index of the chunk as a number. You can discriminate based on that index
  2. zipWithState You can add a state from one invocation of your method to the next and use this state to track if you are still parsing headers or a if you have reached the body. In the next step you can then use this state to handle header and body different
  3. repartition Use this to group all header and all body elements together. You can then process them in the next step.
  4. zipWithNext This function always presents you the previous element grouped with the current element. You can use this to detect, when you are switching from header to body and react accordingly.

Possibly you should re-think, what you really need. For exactly your question, it would be zipwithIndex and then map. But if you re-think your problem, you will probably end with repartition or zipWithState.

Example code

Let's make a simple example: A HTTP client, that separates the HTTP header elements from the body (HTTP, not HTML). In the header a things like cookies, in the body is the real "content", like an image or the HTTP sources.

A simple HTTP client could look like this:

import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

implicit val AG = nio.DefaultAsynchronousChannelGroup

def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
  Process(
    s"GET $path HTTP/1.1",
    s"Host: $hostname",
    "Accept: */*",
    "User-Agent: scalaz-stream"
  ).intersperse("\n").append(Process("\n\n"))

def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
  nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())

Now we can use this code to separate header lines from the rest. In HTTP, the header is structured in lines. It is separated from the body by an empty line. So first, let's count the number of lines in the header:

val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))

When I ran this, there were 8 lines in the header. Let's first define an enumeration, so classify the parts of the response:

object HttpResponsePart {
  sealed trait EnumVal
  case object HeaderLine extends EnumVal
  case object HeaderBodySeparator extends EnumVal
  case object Body extends EnumVal
  val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}

And then let's use zipWithIndex plus map to classify the parts of the response:

simpleHttpClient(demoHostName).zipWithIndex.map{
  case (line, idx) if idx < 9 => (line, HeaderLine)
  case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
  case (line, _) => (line, Body)
}.take(15).runLog.run

For me, this works fine. But of course, the amount of header lines can change at any time without notice. It is much more robust to use a very simple parser that considers the structure of the response. for this I use zipWithState:

simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
  case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
  case (_, HeaderLine) => HeaderLine
  case (_, HeaderBodySeparator) => Body
  case (line, Body) => Body
}.take(15).runLog.run

You can see, that both approaches use a similar structure and both approaches should lead to the same result. The fine thing is, both approaches are easily reusable. You can just swap out the source, e.g. with a file, and don't have to change anything. Same with the processing after the classification. The .take(15).runLog.run is exactly the same in both approaches.

like image 105
stefan.schwetschke Avatar answered Oct 18 '22 06:10

stefan.schwetschke