I have a requirement where I need to read from a set of Blocking queues. The blocking queues are created by the Library I am using. My code has to read from the queues. I don't want to create a reader thread for each of these blocking queues. Rather I want to poll them for availability of data using a single thread (or probably using 2/3 threads at max). As some of the blocking queues might not have data for long time, while some of them may get bursts of data. Polling the queues with small timeout will work, but that is not efficient at all as it still needs to keep looping over all the queues even when some of them are without data for long time. Basically, I am looking for a select/epoll(used on sockets) kind of mechanism on blocking queues. Any clue is really appreciated.
Doing that in Go is real easy though. Below code simulates the same with channels and goroutines:
package main
import "fmt"
import "time"
import "math/rand"
func sendMessage(sc chan string) {
var i int
for {
i = rand.Intn(10)
for ; i >= 0 ; i-- {
sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
}
i = 1000 + rand.Intn(32000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func sendNum(c chan int) {
var i int
for {
i = rand.Intn(16);
for ; i >= 0; i-- {
time.Sleep(20 * time.Millisecond)
c <- rand.Intn(65534)
}
i = 1000 + rand.Intn(24000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func main() {
msgchan := make(chan string, 32)
numchan := make(chan int, 32)
i := 0
for ; i < 8 ; i++ {
go sendNum(numchan)
go sendMessage(msgchan)
}
for {
select {
case msg := <- msgchan:
fmt.Printf("Worked on %s\n", msg)
case x := <- numchan:
fmt.Printf("I got %d \n", x)
}
}
}
I suggest you look into using the JCSP library. The equivalent of Go's select
is called Alternative. You would only need one consuming thread, which will not need to poll the incoming channels if it switches on them with Alternative
. Therefore this would be an efficient way to multiplex the source data.
It will help a lot if you are able to replace the BlockingQueues with JCSP channels. Channels behave essentially the same but provide a greater degree of flexibility regarding the fan-out or fan-in of sharing of channel ends, and in particular, the use of channels with Alternative
.
For an example of usage, here is a fair multiplexer. This example demonstrates a process that fairly multiplexes traffic from its array of input channels to its single output channel. No input channel will be starved, regardless of the eagerness of its competitors.
import org.jcsp.lang.*;
public class FairPlex implements CSProcess {
private final AltingChannelInput[] in;
private final ChannelOutput out;
public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
this.in = in;
this.out = out;
}
public void run () {
final Alternative alt = new Alternative (in);
while (true) {
final int index = alt.fairSelect ();
out.write (in[index].read ());
}
}
}
Note that if priSelect
were used above, higher-indexed channels would be starved if lower-indexed channels were continually demanding service. Or instead of fairSelect
, select
could be used, but then no starvation analysis is possible. The select
mechanism should only be used when starvation is not an issue.
Freedom from Deadlock
As with Go, a Java program using channels must be designed not to deadlock. The implementation of low-level concurrency primitives in Java is very hard to get right and you need something dependable. Fortunately, Alternative
has been validated by formal analysis, along with the JCSP channels. This makes it a solid reliable choice.
Just to clear up on slight point of confusion, the current JCSP version is 1.1-rc5 in the Maven repos, not what the website says.
The only way is to replace standard queues with objects of a more functional class, which notifies consumer(s) when datum is inserted in an empty queue. This class still can implement the BlockingQueue interface, so the other side (producer) see no difference. The trick is that put
operation should also raise a flag and notify consumer. Consumer, after polling all threads, clears the flag and calls Object.wait()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With