Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read unique elements from array per thread?

I have an object based on array, which implements the following interface:

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}

I would like to create a specific iterator over it:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}

In method next I would like to return some element from PairSupplier.

This element should be unique for thread, other threads should not have this element.

Since PairSupplier has a final size, this situation is not always possible, but I would like to approach it.

The order of elements doesn't matter, thread can take same element at a different time.

Example: 2 Threads, 5 elements - {1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5

My solution:

I create AtomicInteger index, which I increment on every next call.

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}

pairs and index are shared among all threads.

I found this solution not scalable (because all threads go for increment), maybe someone have better ideas?

This iterator will be used by 50-1000 threads.

like image 220
Mary Ryllo Avatar asked Oct 15 '13 13:10

Mary Ryllo


3 Answers

Your question details are ambiguous - your example suggests that two threads can be handed the same Pair but you say otherwise in the description.

As the more difficult to achieve, I will offer an Iterable<Pair<Q,E>> that will deliver Pairs one per thread until the supplier cycles - then it will repeat.

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}

NB: I have adjusted the code a little to accommodate both scenarios. You can take an Iterator per thread or share a single Iterator across threads.

like image 109
OldCurmudgeon Avatar answered Nov 15 '22 06:11

OldCurmudgeon


I'm having some trouble understanding what the problem you are trying to solve is?

Does each thread process the whole collection?

Is the concern that no two threads can work on the same Pair at the same time? But each thread needs to process each Pair in the collection?

Or do you want the collection processed once by using all of the threads?

like image 42
Patrick Avatar answered Nov 15 '22 06:11

Patrick


You have a piece of information ("has anyone taken this Pair already?") that must be shared between all threads. So for the general case, you're stuck. However, if you have an idea about this size of your array and the number of threads, you could use buckets to make it less painful.

Let's suppose we know that there will be 1,000,000 array elements and 1,000 threads. Assign each thread a range (thread #1 gets elements 0-999, etc). Now instead of 1,000 threads contending for one AtomicInteger, you can have no contention at all!

That works if you can be sure that all your threads will run at about the same pace. If you need to handle the case where sometimes thread #1 is busy doing other things while thread #2 is idle, you can modify your bucket pattern slightly: each bucket has an AtomicInteger. Now threads will generally only contend with themselves, but if their bucket is empty, they can move on to the next bucket.

like image 35
Nathaniel Waisbrot Avatar answered Nov 15 '22 07:11

Nathaniel Waisbrot