Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

scalaz-stream how to implement `ask-then-wait-reply` tcp client

I want to implement an client app that first send an request to server then wait for its reply(similar to http)

My client process may be

 val topic = async.topic[ByteVector]
 val client = topic.subscribe

Here is the api

trait Client {
  val incoming = tcp.connect(...)(client)
   val reqBus = topic.pubsh()
   def ask(req: ByteVector): Task[Throwable \/ ByteVector] =  {
      (tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
      ???
   }
}

Then, how to implement the remain part of ask ?

like image 995
jilen Avatar asked Jan 15 '15 15:01

jilen


1 Answers

Usually, the implementation is done with publishing the message via sink and then awaiting some sort of reply on some source, like your topic.

Actually we have a lot of idioms of this in our code :

def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
 merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}

Essentially this first hooks to reply stream to await any resulting O confirming our request sent. Then we publish message I and consult pf for any incoming O to be eventually translated to O2 and then terminate.

like image 100
Pavel Chlupacek Avatar answered Nov 15 '22 07:11

Pavel Chlupacek