Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Advice for efficient blocking queries

I would like to store tuples objects in a concurent java collection and then have an efficient, blocking query method that returns the first element matching a pattern. If no such element is available, it would block until such element is present.

For instance if I have a class:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

And a collection like:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

I would like to query it like:

myFunkyCollection.get( new Pair( null, "foo" ) );

which returns the first available pair with the second field equalling "foo" or blocks until such element is added. Another query example:

myFunkyCollection.get( new Pair( null, null ) );

should return the first available pair whatever its values.

Does a solution already exists ? If it is not the case, what do you suggest to implement the get( Pair p ) method ?

Clarification: The method get( Pair p) must also remove the element. The name choice was not very smart. A better name would be take( ... ).

like image 883
paradigmatic Avatar asked Jan 23 '23 19:01

paradigmatic


1 Answers

Here's some source code. It basically the same as what cb160 said, but having the source code might help to clear up any questions you may still have. In particular the methods on the FunkyCollection must be synchronized.

As meriton pointed out, the get method performs an O(n) scan for every blocked get every time a new object is added. It also performs an O(n) operation to remove objects. This could be improved by using a data structure similar to a linked list where you can keep an iterator to the last item checked. I haven't provided source code for this optimization, but it shouldn't be too difficult to implement if you need the extra performance.

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

Output:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

Note that I put everything into one source file to make it easier to run.

like image 99
Mark Byers Avatar answered Jan 25 '23 08:01

Mark Byers