I want to implement an client app that first send an request to server then wait for its reply(similar to http)
My client process may be
val topic = async.topic[ByteVector]
val client = topic.subscribe
Here is the api
trait Client {
val incoming = tcp.connect(...)(client)
val reqBus = topic.pubsh()
def ask(req: ByteVector): Task[Throwable \/ ByteVector] = {
(tcp.writes(req).flatMap(_ => tcp.reads(1024))).to(reqBus)
???
}
}
Then, how to implement the remain part of ask
?
Usually, the implementation is done with publishing the message via sink and then awaiting some sort of reply on some source, like your topic.
Actually we have a lot of idioms of this in our code :
def reqRply[I,O,O2](src:Process[Task,I],sink:Sink[Task,I],reply:Process[Task,O])(pf: PartialFunction[O,O2]):Process[Task,O2] = {
merge.mergeN(Process(reply, (src to sink).drain)).collectFirst(pf)
}
Essentially this first hooks to reply stream to await any resulting O
confirming our request sent. Then we publish message I
and consult pf
for any incoming O
to be eventually translated to O2
and then terminate.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With