Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Equivalent of Go channel in Java

Tags:

I have a requirement where I need to read from a set of Blocking queues. The blocking queues are created by the Library I am using. My code has to read from the queues. I don't want to create a reader thread for each of these blocking queues. Rather I want to poll them for availability of data using a single thread (or probably using 2/3 threads at max). As some of the blocking queues might not have data for long time, while some of them may get bursts of data. Polling the queues with small timeout will work, but that is not efficient at all as it still needs to keep looping over all the queues even when some of them are without data for long time. Basically, I am looking for a select/epoll(used on sockets) kind of mechanism on blocking queues. Any clue is really appreciated.

Doing that in Go is real easy though. Below code simulates the same with channels and goroutines:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}
like image 462
Nipun Talukdar Avatar asked Mar 21 '14 14:03

Nipun Talukdar


2 Answers

I suggest you look into using the JCSP library. The equivalent of Go's select is called Alternative. You would only need one consuming thread, which will not need to poll the incoming channels if it switches on them with Alternative. Therefore this would be an efficient way to multiplex the source data.

It will help a lot if you are able to replace the BlockingQueues with JCSP channels. Channels behave essentially the same but provide a greater degree of flexibility regarding the fan-out or fan-in of sharing of channel ends, and in particular, the use of channels with Alternative.

For an example of usage, here is a fair multiplexer. This example demonstrates a process that fairly multiplexes traffic from its array of input channels to its single output channel. No input channel will be starved, regardless of the eagerness of its competitors.

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
 }

Note that if priSelect were used above, higher-indexed channels would be starved if lower-indexed channels were continually demanding service. Or instead of fairSelect, select could be used, but then no starvation analysis is possible. The select mechanism should only be used when starvation is not an issue.

Freedom from Deadlock

As with Go, a Java program using channels must be designed not to deadlock. The implementation of low-level concurrency primitives in Java is very hard to get right and you need something dependable. Fortunately, Alternative has been validated by formal analysis, along with the JCSP channels. This makes it a solid reliable choice.

Just to clear up on slight point of confusion, the current JCSP version is 1.1-rc5 in the Maven repos, not what the website says.

like image 73
Rick-777 Avatar answered Sep 17 '22 13:09

Rick-777


The only way is to replace standard queues with objects of a more functional class, which notifies consumer(s) when datum is inserted in an empty queue. This class still can implement the BlockingQueue interface, so the other side (producer) see no difference. The trick is that put operation should also raise a flag and notify consumer. Consumer, after polling all threads, clears the flag and calls Object.wait().

like image 34
Alexei Kaigorodov Avatar answered Sep 16 '22 13:09

Alexei Kaigorodov