Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Reactor benefits when Number of Publisher threads is much higher than number of consumers

I have the following use case:

  • N threads publish data (N can range from 10 to 1000 threads), those thread can make HTTP request, jdbc calls, pure java processing using only local computer
  • 1 to M thread consume it making IOs (send HTTP requests, write to Database... possibliy in bulk) , those threads should not slowdown the publishers. M must not exceed 10 threads.

N threads may publish data much faster than consumer will consume it, but the idea is to minimize the slowdowns of publisher.

I have implemented an approach based on ArrayBlockingQueue where publisher write, and a thread that takes data our of the queue and processes it, it works but results are not great.

I am thus studying the Reactor pattern and particularly Spring-Reactor to see if it could be a response to my use case. It is the case ?

I read :

  • https://spring.io/guides/gs/messaging-reactor/#initial => This one does not seems to meet my use-case.

  • https://github.com/reactor/reactor/blob/master/reactor-core/src/test/java/reactor/core/processor/ProcessorThroughputTests.java => Seems closer to mine but need confirmation

In my situation where number of publisher threads is much higher than number of consumers is it the right choice ?

like image 725
pmpm Avatar asked Aug 30 '25 18:08

pmpm


2 Answers

It sounds like you might want to look at Reactor's PersistentQueue facility and separate your Publisher from your Subscriber across that. It's a normal Queue implementation but it uses the Chronicle Queue for persistence, fail-over, and replayability. It is also extremely, extremely fast.

You would basically have publisher pushing data into the PersistentQueue from one side and a set of subscribers pulling from it on the other. It might be a drop-in replacement for your current use if you're already using a Queue.

I need to write a wiki page on it to show some basic use patterns.

like image 154
Jon Brisbin Avatar answered Sep 02 '25 09:09

Jon Brisbin


I dealt with a similar issue using a custom container class. It uses double-buffering methodology via a CAS object that allows you to read all accumulated objects in one lock-free action.

I have no idea how efficient it is but it's simplicity should ensure it is up there with the good ones.

Note that most of the code below is test code - you can remove all code below the //TESTING comment without affecting the functionality.

/**
 * Lock free - thread-safe.
 *
 * Write from many threads - read with fewer threads.
 *
 * Write items of type T.
 *
 * Read items of type List<T>.
 *
 * @author OldCurmudgeon
 * @param <T> - Th etype we plan to write/read.
 */
public class DoubleBufferedList<T> {

    /**
     * Atomic reference so I can atomically swap it through.
     *
     * Mark = true means I am adding to it so momentarily unavailable for iteration.
     */
    private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

    // Factory method to create a new list - may be best to abstract this.
    protected List<T> newList() {
        return new ArrayList<>();
    }

    /**
     * Get and replace the current list.
     *
     * Used by readers.
     *
     * @return List<T> of a number (possibly 0) of items of type T.
     */
    public List<T> get() {
        // The list that was there.
        List<T> it;
        // Replace an unmarked list with an empty one.
        if (!list.compareAndSet(it = list.getReference(), newList(), false, false)) {
            // Mark was not false - Failed to replace!
            // It is probably marked as being appended to but may have been replaced by another thread.
            // Return empty and come back again soon.
            return Collections.<T>emptyList();
        }
        // Successfull replaced an unmarked list with an empty list!
        return it;
    }

    /**
     * Grab and lock the list in preparation for append.
     *
     * Used by add.
     */
    private List<T> grab() {
        List<T> it;
        // We cannot fail so spin on get and mark.
        while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
            // Spin on mark - waiting for another grabber to release (which it must).
        }
        return it;
    }

    /**
     * Release the grabbed list.
     *
     * Opposite of grab.
     */
    private void release(List<T> it) {
        // Unmark it - should this be a compareAndSet(it, it, true, false)?
        if (!list.attemptMark(it, false)) {
            // Should never fail because once marked it will not be replaced.
            throw new IllegalMonitorStateException("It changed while we were adding to it!");
        }
    }

    /**
     * Add an entry to the list.
     *
     * Used by writers.
     *
     * @param entry - The new entry to add.
     */
    public void add(T entry) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entry.
            it.add(entry);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add many entries to the list.
     *
     * @param entries - The new entries to add.
     */
    public void add(List<T> entries) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entries.
            it.addAll(entries);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add a number of entries.
     *
     * @param entries - The new entries to add.
     */
    @SafeVarargs
    public final void add(T... entries) {
        // Make a list of them.
        add(Arrays.<T>asList(entries));
    }

    // TESTING.
    // How many testers to run.
    static final int N = 10;
    // The next one we're waiting for.
    static final AtomicInteger[] seen = new AtomicInteger[N];
    // The ones that arrived out of order.
    static final ConcurrentSkipListSet<Widget>[] queued = Generics.<ConcurrentSkipListSet<Widget>>newArray(N);

    static class Generics {

        // A new Generics method for when we switch to Java 7.
        @SafeVarargs
        static <E> E[] newArray(int length, E... array) {
            return Arrays16.copyOf(array, length);
        }
    }

    static {
        // Populate the arrays.
        for (int i = 0; i < N; i++) {
            seen[i] = new AtomicInteger();
            queued[i] = new ConcurrentSkipListSet<>();
        }
    }

    // Thing that is produced and consumed.
    private static class Widget implements Comparable<Widget> {

        // Who produced it.
        public final int producer;
        // Its sequence number.
        public final int sequence;

        public Widget(int producer, int sequence) {
            this.producer = producer;
            this.sequence = sequence;
        }

        @Override
        public String toString() {
            return producer + "\t" + sequence;
        }

        @Override
        public int compareTo(Widget o) {
            // Sort on producer
            int diff = Integer.compare(producer, o.producer);
            if (diff == 0) {
                // And then sequence
                diff = Integer.compare(sequence, o.sequence);
            }
            return diff;
        }
    }

    // Produces Widgets and feeds them to the supplied DoubleBufferedList.
    private static class TestProducer implements Runnable {

        // The list to feed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // The sequence we're at
        int sequence = 0;
        // Set this at true to stop me.
        public volatile boolean stop = false;

        public TestProducer(DoubleBufferedList<Widget> list, int id) {
            this.list = list;
            this.id = id;
        }

        @Override
        public void run() {
            // Just pump the list.
            while (!stop) {
                list.add(new Widget(id, sequence++));
            }
        }
    }

    // Consumes Widgets from the suplied DoubleBufferedList
    private static class TestConsumer implements Runnable {

        // The list to bleed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // Set this at true to stop me.
        public volatile boolean stop = false;

        public TestConsumer(DoubleBufferedList<Widget> list, int id) {
            this.list = list;
            this.id = id;
        }

        @Override
        public void run() {
            // The list I am working on.
            List<Widget> l = list.get();
            // Stop when stop == true && list is empty
            while (!(stop && l.isEmpty())) {
                // Record all items in list as arrived.
                arrived(l);
                // Grab another list.
                l = list.get();
            }
        }

        private void arrived(List<Widget> l) {
            for (Widget w : l) {
                // Mark each one as arrived.
                arrived(w);
            }
        }

        // A Widget has arrived.
        private static void arrived(Widget w) {
            // Which one is it?
            AtomicInteger n = seen[w.producer];
            // Don't allow multi-access to the same producer data or we'll end up confused.
            synchronized (n) {
                // Is it the next to be seen?
                if (n.compareAndSet(w.sequence, w.sequence + 1)) {
                    // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
                    for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
                        Widget it = i.next();
                        // Is it in sequence?
                        if (n.compareAndSet(it.sequence, it.sequence + 1)) {
                            // Done with that one too now!
                            i.remove();
                        } else {
                            // Found a gap! Stop now.
                            break;
                        }
                    }
                } else {
                    // Out of sequence - Queue it.
                    queued[w.producer].add(w);
                }
            }
        }
    }

    // Main tester
    public static void main(String args[]) {
        try {
            System.out.println("DoubleBufferedList:Test");
            // Create my test buffer.
            DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
            // All running threads - Producers then Consumers.
            List<Thread> running = new LinkedList<>();
            // Start some producer tests.
            List<TestProducer> producers = new ArrayList<>();
            for (int i = 0; i < N; i++) {
                TestProducer producer = new TestProducer(list, i);
                Thread t = new Thread(producer);
                t.setName("Producer " + i);
                t.start();
                producers.add(producer);
                running.add(t);
            }

            // Start the same number of consumers (could do less or more if we wanted to).
            List<TestConsumer> consumers = new ArrayList<>();
            for (int i = 0; i < N; i++) {
                TestConsumer consumer = new TestConsumer(list, i);
                Thread t = new Thread(consumer);
                t.setName("Consumer " + i);
                t.start();
                consumers.add(consumer);
                running.add(t);
            }
            // Wait for a while.
            Thread.sleep(5000);
            // Close down all.
            for (TestProducer p : producers) {
                p.stop = true;
            }
            for (TestConsumer c : consumers) {
                c.stop = true;
            }
            // Wait for all to stop.
            for (Thread t : running) {
                System.out.println("Joining " + t.getName());
                t.join();
            }
            // What results did we get?
            int totalMessages = 0;
            for (int i = 0; i < N; i++) {
                // How far did the producer get?
                int gotTo = producers.get(i).sequence;
                // The consumer's state
                int seenTo = seen[i].get();
                totalMessages += seenTo;
                Set<Widget> queue = queued[i];
                if (seenTo == gotTo && queue.isEmpty()) {
                    System.out.println("Producer " + i + " ok.");
                } else {
                    // Different set consumed as produced!
                    System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
                }
            }
            System.out.println("Total messages " + totalMessages);

        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}
like image 33
OldCurmudgeon Avatar answered Sep 02 '25 08:09

OldCurmudgeon