Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to design a system which sends records and retries sending them again, if an acknowledgement is not receieved? [closed]

I am working on a project, where I need to consume a lot of records and then I am sending these records to some other system, which uses ZeroMQ.

Here is the flow:

  • Store all the incoming records in a CHM from multiple threads. Records will come at very high speed.
  • From a background thread, which runs every 1 minute, send these records from CHM to ZeroMQ servers.
  • After sending each record to ZeroMQ servers, add them to a retry bucket as well, so that it can be retried after a particular time passes, if an acknowledgment is not yet received for this record.
  • We also have a poller runnable thread, which receives an acknowledgment from ZeroMQ servers, that tells these records have been received, so once I get an acknowledgment back, I delete that record from the retry bucket, so that it doesn't get retried.
  • Even if some records are sent twice, it's ok, but it is good to minimize this.

I am not sure what is the best way to minimize this in my below scenario.

Below is my Processor class in which an .add() method will be called by multiple threads to populate dataHolderByPartitionReference CHM in a thread safe way. And then, in the constructor of Processor class, I start the background thread, which runs every 30 seconds, to push records from the same CHM to a set of ZeroMQ servers, by calling SendToZeroMQ class as shown below:


Processor

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
        // calling validateAndSend in parallel for each partition (which is map key)
        // generally there will be only 5-6 unique partitions max
  }

  private void validateAndSend(final int partition,
      final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
        .........
        .........
        SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
    }
    // calling again with remaining values
    SendToZeroMQ.getInstance().executeAsync(partition, clientKeyBytesAndProcessBytesHolder);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition, final DataHolder holder) {
    // store records in dataHolderByPartitionReference in a thread safe way
  }
}

And below is my SendToZeroMQ class, which sends a record to a set of ZeroMQ servers and also retries accordingly, depending on an acknowledgment delivery.

  • Firstly it will send a record to ZeroMQ servers.
  • Then it will add a same record to retryBucket, which will get retried later on, depending on whether an acknowledgment was received or not.
  • In the same class, I start a background thread, which runs every 1 minute to send records again, which are still in a retry bucket.
  • Same class also starts ResponsePoller thread, which will keep running forever, to see what records have been acknowledged ( which we have sent before ), so as soon as records are acknowledged, the ResponsePoller thread will remove those records from retryBucket, so that these do not get retried.

SendToZeroMQ

public class SendToZeroMQ {
  // do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
  private final ScheduledExecutorService executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final Cache<Long, byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
      .removalListener(RemovalListeners.asynchronous(new CustomListener(), executorService))
      .build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryBucket.asMap().entrySet()) {
          executeAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.MINUTES);
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  public boolean executeAsync(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // add to retry bucket
    retryBucket.put(address, encodedByteArray);
    return sent;
  }

  public boolean executeAsync(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }         
    Map<Long, byte[]> addressToencodedByteArray = encode(partition, clientKeyBytesAndProcessBytesHolder);
    long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
    byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
    return executeAsync(address, encodedByteArray, liveSockets.get().getSocket());
  }

  private Map<Long, byte[]> encode(final int partition,
      final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {

    // this address will be unique always
    long address = TestUtils.getAddress();
    Frame frame = new Frame(............);
    byte[] packedByteArray = frame.serialize();
    // this map will always have one entry in it.
    return ImmutableMap.of(address, packedByteArray);
  }

  public void removeFromRetryBucket(final long address) {
    retryBucket.invalidate(address);
  }
}

And below is my ResponsePoller class, which waits for the acknowledgment for all those records, that 've been already sent by the other background thread. If an acknowledgement is received, then delete it from the retry bucket, so that it doesn't get retried.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8076;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getAddress(frame.getData());
              // remove from retry bucket since we got the acknowledgment for this record
              SendToZeroMQ.getInstance().removeFromRetryBucket(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

Question:

  • I am trying to see, from the design perspective, what is the best way to design this problem, so that all my logic works seamlessly?

  • I am pretty sure there is a better way to design this problem as compared to what I have - what that better way could be?

like image 839
john Avatar asked Jan 26 '17 04:01

john


3 Answers

In my opinion, it should not be your concern to worry about the data reception acknowledgment at the 'Application Layer' as long as you are using TCP for the underlying communication.

In this case - as ZeroMQ was built on top of TCP itself, with further optimizations, you need not worry about successful data transfer, as long as there is no exception at the transport layer ( which obviously is bounced back to you for handling the case ).

The way I see your problem is that - you are running Kafka consumer threads, which will receive and bounce back messages to another message queue ( in this case is ZMQ, which is using TCP and guarantees successful message delivery, or throws the exception at lower layers of communication ).

The simplest solution I could think up of is to use a thread pool, from within each consumer and try to send the message using ZMQ. In any case of a network error, you could easily pool that message for later consumption or logging as long as your application daemon is running.

In the proposed solution I am assuming that the order of messages is not in the problem space. And you are not looking at complicating things.

like image 159
Siddharth Tyagi Avatar answered Nov 15 '22 14:11

Siddharth Tyagi


I am trying to see, from the design perspective, what is the best way to design this problem, so that all my logic works seamlessly?

I am pretty sure there is a better way to design this problem as compared to what I have - what that better way could be?

I was trying to implement something similar, but reading from spark kafka and posting to another kafka topic. A few things that have helped me along were:

1) Used a strategy pattern to implement various exception handling strategies. I took the inspiration from zookeeper, which has various retry strategies like, RetryNTimes, ExponentialBackOff, Retry With Interval etc.

2) Each of these strategies are used in different contexts. In the sense, I had to post my data to a variety of locations and the exceptions could range from a bad request being sent to non availability of network. In the worst scenarios, where network retry has failed N times, I saved them to cassandra database, with appropriate messages, and a cron / manual process can then retry or replay the requests by posting to another kafka topic. A good caching strategy should have done it, but we needed the data for further analytics as well. Hence the persistence.

3) I prefer to not write extensive multithreading code but rather try to hand it off to the framework, to care of it for me. After a few years of facing nasty bugs in multithreading ( I am no expert in this area ), I started favouring frameworks like akka to handle the multithreading part for me.

like image 34
Raveesh Sharma Avatar answered Nov 15 '22 14:11

Raveesh Sharma


I think your situation is a perfect candidate for "Saga" design pattern (Sagas by Hector Garcia-Molina and Kenneth Salem).

Basically, you have a long-running business transaction, which consists of several time sends (retries), until the status changes to acknowledged. Express this flow as an entity of its own (Saga), which has method to execute the retry, as well as method to acknowledge the receipt. Once acknowledged, it should not execute the retry anymore.

How you store and handle the saga, does not really matter, and has no direct impact on the pattern itself. You can use any technology that executes on interval-basis, retrieves all sagas that are not yet acknowledged, executes them, and saves them. You should also have the acknowledge receiver endpoint, that retrieves the saga, marks it as acknowledged, and then saves it.

Many message brokers and service buses have retry capabilities within. You can use what you already have (if it has retry capabilities), or you can use any others that have it. Or, as I said before, you can simply execute sagas from your own application on interval-basis.

like image 27
Tengiz Avatar answered Nov 15 '22 13:11

Tengiz