Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nondeterministically interleaving conduit's Sources

I was hoping to see a nondeterministic interleaving operation for sources, with a type signature like

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)

The use case is that I have a p2p application that maintains open connections to many nodes on the network, and it is mostly just sitting around waiting for messages from any of them. When a message arrives, it doesn't care where it came from, but needs to process the message as soon as possible. In theory this kind of application (at least when used for socket-like sources) could bypass GHC's IO manager entirely and run the select/epoll/etc. calls directly, but I don't particularly care how it's implemented, as long as it works.

Is something like this possible with conduit? A less general but probably more feasible approach might be to write a [(k, Socket)] -> Source m (k, ByteString) function that handles receiving on all the sockets for you.

I noticed the ResumableSource operations in conduit, but they all seem to want to be aware of a particular Sink, which feels like a bit of an abstraction leak, at least for this operation.

like image 258
copumpkin Avatar asked Jul 14 '12 06:07

copumpkin


2 Answers

The stm-conduit package provides the mergeSources which performs something similar- though not identical- to what you're looking for. It's probably a good place to start.

like image 81
Michael Snoyman Avatar answered Nov 06 '22 11:11

Michael Snoyman


Yes, it is possible.

You can poll a bunch of Sources without blocking by forking threads to poll where in each thread you pair the Source up with a Sink that sends the output to some concurrency channel:

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r

... and then you define a Source that reads from that channel:

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r

Notice that this would be no different than just forking the threads to poll the sockets themselves, but it would be useful to other users of conduit that might want to poll other things than sockets using Sources they defined because it's more general.

If you combined those capabilities into one function, then the overall API of the call would look something like:

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)

... but you can still throw in those ks if you want.

like image 40
Gabriella Gonzalez Avatar answered Nov 06 '22 11:11

Gabriella Gonzalez