Can anyone tell me what is Disruptor design pattern with simple example ? I will want to know basics of this design pattern.
A simple Google gives me lots of info, including this introduction by Martin Fowler
At a crude level you can think of a Disruptor as a multicast graph of queues where producers put objects on it that are sent to all the consumers for parallel consumption through separate downstream queues. When you look inside you see that this network of queues is really a single data structure - a ring buffer. Each producer and consumer has a sequence counter to indicate which slot in the buffer it's currently working on. Each producer/consumer writes its own sequence counter but can read the others' sequence counters. This way the producer can read the consumers' counters to ensure the slot it wants to write in is available without any locks on the counters. Similarly a consumer can ensure it only processes messages once another consumer is done with it by watching the counters.
The GitHub project contains the Java code + doc.
I spent a couple of days reading all around it and am just starting to get to grips with it architecturally as well as getting to grips with the reasons as to why this design pattern came around.
For a simple code example of how to implement try https://github.com/trevorbernard/disruptor-examples
For a good description including links to a white paper,source code and UML diagrams you could try starting at http://martinfowler.com/articles/lmax.html
From this article:
The disruptor pattern is a batching queue backed up by a circular array (i.e. the ring buffer) filled with pre-allocated transfer objects which uses memory-barriers to synchronize producers and consumers through sequences.
Fortunately you don't need to understand the intrinsic details of the disruptor pattern in order to use it. In case you find it easier to understand through code, below is the Hello World of CoralQueue, a ultra-low-latency queue for inter-thread communication that implements the disruptor pattern.
package com.coralblocks.coralqueue.sample.queue;
import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;
public class Basics {
    public static void main(String[] args) {
        final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
            @Override
            public StringBuilder newInstance() {
                return new StringBuilder(1024);
            }
        });
        Thread producer = new Thread(new Runnable() {
            private final StringBuilder getStringBuilder() {
                StringBuilder sb;
                while((sb = queue.nextToDispatch()) == null) {
                    // queue can be full if the size of the queue
                    // is small and/or the consumer is too slow
                    // busy spin (you can also use a wait strategy instead)
                }
                return sb;
            }
            @Override
            public void run() {
                StringBuilder sb;
                while(true) { // the main loop of the thread
                    // (...) do whatever you have to do here...
                    // and whenever you want to send a message to
                    // the other thread you can just do:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hello!");
                    queue.flush();
                    // you can also send in batches to increase throughput:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi!");
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi again!");
                    queue.flush(); // dispatch the two messages above...
                }
            }
        }, "Producer");
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) { // the main loop of the thread
                    // (...) do whatever you have to do here...
                    // and whenever you want to check if the producer
                    // has sent a message you just do:
                    long avail;
                    while((avail = queue.availableToPoll()) == 0) {
                        // queue can be empty!
                        // busy spin (you can also use a wait strategy instead)
                    }
                    for(int i = 0; i < avail; i++) {
                        StringBuilder sb = queue.poll();
                        // (...) do whatever you want to do with the data
                        // just don't call toString() to create garbage...
                        // copy byte-by-byte instead...
                    }
                    queue.donePolling();
                }
            }
        }, "Consumer");
        consumer.start();
        producer.start();
    }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With