Context: I'm trying to write a
Process1[ByteVector, spray.http.HttpResponsePart]
with outputChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd
. I haven't yet fully wrapped my head around scalaz-stream and its vocabulary.
How to write a Process that can handle first n
chunks differently?
I've come up with this (strings as an example):
val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _))
val headerChunkAndRest: Process1[String, String] =
headerChunk.take(1) ++ process1.id
io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt")))
.pipe(headerChunkAndRest)
.to(io.stdOutLines)
.run.run
What is an idiomatic and, possibly, a generally composable way to write headerChunkAndRest
?
There are several ways to do this, strongly depending on the details of your needs. You can use the following helper methods that are part of scalaz-streams:
foldWithIndex
This gives you the current index of the chunk as a number. You can discriminate based on that indexzipWithState
You can add a state from one invocation of your method to the next and use this state to track if you are still parsing headers or a if you have reached the body. In the next step you can then use this state to handle header and body differentrepartition
Use this to group all header and all body elements together. You can then process them in the next step.zipWithNext
This function always presents you the previous element grouped with the current element. You can use this to detect, when you are switching from header to body and react accordingly.Possibly you should re-think, what you really need. For exactly your question, it would be zipwithIndex
and then map
. But if you re-think your problem, you will probably end with repartition
or zipWithState
.
Let's make a simple example: A HTTP client, that separates the HTTP header elements from the body (HTTP, not HTML). In the header a things like cookies, in the body is the real "content", like an image or the HTTP sources.
A simple HTTP client could look like this:
import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
implicit val AG = nio.DefaultAsynchronousChannelGroup
def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
Process(
s"GET $path HTTP/1.1",
s"Host: $hostname",
"Accept: */*",
"User-Agent: scalaz-stream"
).intersperse("\n").append(Process("\n\n"))
def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())
Now we can use this code to separate header lines from the rest. In HTTP, the header is structured in lines. It is separated from the body by an empty line. So first, let's count the number of lines in the header:
val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))
When I ran this, there were 8 lines in the header. Let's first define an enumeration, so classify the parts of the response:
object HttpResponsePart {
sealed trait EnumVal
case object HeaderLine extends EnumVal
case object HeaderBodySeparator extends EnumVal
case object Body extends EnumVal
val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}
And then let's use zipWithIndex
plus map
to classify the parts of the response:
simpleHttpClient(demoHostName).zipWithIndex.map{
case (line, idx) if idx < 9 => (line, HeaderLine)
case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
case (line, _) => (line, Body)
}.take(15).runLog.run
For me, this works fine. But of course, the amount of header lines can change at any time without notice. It is much more robust to use a very simple parser that considers the structure of the response. for this I use zipWithState
:
simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
case (_, HeaderLine) => HeaderLine
case (_, HeaderBodySeparator) => Body
case (line, Body) => Body
}.take(15).runLog.run
You can see, that both approaches use a similar structure and both approaches should lead to the same result. The fine thing is, both approaches are easily reusable. You can just swap out the source, e.g. with a file, and don't have to change anything. Same with the processing after the classification. The .take(15).runLog.run
is exactly the same in both approaches.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With