Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement retry policies while sending data to another application?

I am working on my application which sends data to zeromq. Below is what my application does:

  • I have a class SendToZeroMQ that send data to zeromq.
  • Add same data to retryQueue in the same class so that it can be retried later on if acknowledgment is not received. It uses guava cache with maximumSize limit.
  • Have a separate thread which receives acknowledgement from the zeromq for the data that was sent earlier and if acknowledgement is not received, then SendToZeroMQ will retry sending that same piece of data. And if acknowledgement is received, then we will remove it from retryQueue so that it cannot be retried again.

Idea is very simple and I have to make sure my retry policy works fine so that I don't loose my data. This is very rare but in case if we don't receive acknolwedgements.

I am thinking of building two types of RetryPolicies but I am not able to understand how to build that here corresponding to my program:

  • RetryNTimes: In this it will retry N times with a particular sleep between each retry and after that, it will drop the record.
  • ExponentialBackoffRetry: In this it will exponentially keep retrying. We can set some max retry limit and after that it won't retry and will drop the record.

Below is my SendToZeroMQ class which sends data to zeromq, also retry every 30 seconds from a background thread and start ResponsePoller runnable which keeps running forever:

public class SendToZeroMQ {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
  private final Cache<Long, byte[]> retryQueue =
      CacheBuilder
          .newBuilder()
          .maximumSize(10000000)
          .concurrencyLevel(200)
          .removalListener(
              RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
          sendTo(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  public boolean sendTo(final long address, final byte[] encodedRecords) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
  }

  public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // adding to retry queue
    retryQueue.put(address, encodedByteArray);
    return sent;
  }

  public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
  }
}

Below is my ResponsePoller class which polls all the acknowledgement from the zeromq. And if we get an acknowledgement back from the zeromq then we will remove that record from the retry queue so that it doesn't get retried otherwise it will get retried.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
                long address = TestUtils.getAddress(frame.getData());
                // remove from retry queue since we got the acknowledgment for this record
                SendToZeroMQ.getInstance().removeFromRetryQueue(address);               
            } catch (Exception ex) {
                // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

Question:

As you can see above, I am sending encodedRecords to zeromq using SendToZeroMQ class and then it gets retried every 30 seconds depending on whether we got an acknolwedgement back from ResponsePoller class or not.

For each encodedRecords there is a unique key called address and that's what we will get back from zeromq as an acknowledgement.

How can I go ahead and extend this example to build two retry policies that I mentioned above and then I can pick what retry policy I want to use while sending data. I came up with below interface but then I am not able understand how should I move forward to implement those retry policies and use it in my above code.

public interface RetryPolicy {
    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     */
    public boolean allowRetry(int retryCount, long elapsedTimeMs);
}

Can I use guava-retrying or failsafe here becuase these libraries already have many retry policies which I can use?

like image 742
john Avatar asked Feb 05 '17 20:02

john


People also ask

What are retry policies?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution. (Retry Policies do not apply to Workflow Task Executions, which always retry indefinitely.)

How do you create a retry mechanism in Java?

A simple solution to implement retry logic in Java is to write your code inside a for loop that executes the specified number of times (the maximum retry value).


2 Answers

Here is a working little simulation of your environment that shows how this can be done. Note the Guava cache is the wrong data structure here, since you aren't interested in eviction (I think). So I'm using a concurrent hashmap:

package experimental;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

class Experimental {
  /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
  interface RetryStrategy {
    long getDelayMs(int retry);
  }

  enum ConstantBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 1000L;
    }
  }

  enum ExponentialBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 100 + (1L << retry);
    }
  }

  static class Sender {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();

    /** Send the given data with given address on the given socket. */
    void sendTo(long addr, byte[] data, int socket) {
      System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
    }

    private class Retrier implements Runnable {
      private final RetryStrategy retryStrategy;
      private final long addr;
      private final byte[] data;
      private final int socket;
      private int retry;
      private Future<?> future; 

      Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
        this.retryStrategy = retryStrategy;
        this.addr = addr;
        this.data = data;
        this.socket = socket;
        this.retry = 0;
      }

      synchronized void start() {
        if (future == null) {
          future = executorService.submit(this);
          pending.put(addr, this);
        }
      }

      synchronized void cancel() {
        if (future != null) {
          future.cancel(true);
          future = null;
        }
      }

      private synchronized void reschedule() {
        if (future != null) {
          future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
        }
      }

      @Override
      synchronized public void run() {
        sendTo(addr, data, socket);
        reschedule();
      }
    }

    long getVerifiedAddr() {
      System.err.println("Pending messages: " + pending.size());
      Iterator<Long> i = pending.keySet().iterator();
      long addr = i.hasNext() ? i.next() : 0;
      return addr;
    }

    class CancellationPoller implements Runnable {
      @Override
      public void run() {
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ex) { 
            Thread.currentThread().interrupt();
          }
          long addr = getVerifiedAddr();
          if (addr == 0) {
            continue;
          }
          System.err.println("Verified message (to be cancelled) " + addr);
          Retrier retrier = pending.remove(addr);
          if (retrier != null) {
            retrier.cancel();
          }
        }
      }
    }

    Sender initialize() {
      executorService.submit(new CancellationPoller());
      return this;
    }

    void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
      new Retrier(retryStrategy, addr, data, socket).start();
    }
  }

  public static void main(String[] args) {
    Sender sender = new Sender().initialize();
    for (long i = 1; i <= 10; i++) {
      sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
    }
    for (long i = -1; i >= -10; i--) {
      sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
    }
  }
}
like image 190
Gene Avatar answered Oct 27 '22 08:10

Gene


not a perfect way, but can be achieved by below way as well.

public interface RetryPolicy {
public boolean allowRetry();
public void decreaseRetryCount();

}

Create two implementation. For RetryNTimes

public class RetryNTimes implements RetryPolicy {

private int maxRetryCount;
public RetryNTimes(int maxRetryCount) {
    this.maxRetryCount = maxRetryCount;
}

public boolean allowRetry() {
    return maxRetryCount > 0;
}

public void decreaseRetryCount()
{
    maxRetryCount = maxRetryCount-1;
}}

For ExponentialBackoffRetry

public class ExponentialBackoffRetry implements RetryPolicy {

private int maxRetryCount;
private final Date retryUpto;

public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) {
    this.maxRetryCount = maxRetryCount;
    this.retryUpto = retryUpto;
}

public boolean allowRetry() {
    Date date = new Date();
    if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0)
    {
        return false;
    }
    return true;
}

public void decreaseRetryCount() {
    maxRetryCount = maxRetryCount-1;
}}

You need to make some changes in SendToZeroMQ class

public class SendToZeroMQ {

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long,RetryMessage> retryQueue =
        CacheBuilder
                .newBuilder()
                .maximumSize(10000000)
                .concurrencyLevel(200)
                .removalListener(
                        RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}

public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
}

private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
            for (Map.Entry<Long, RetryMessage> entry : retryQueue.asMap().entrySet()) {
                RetryMessage retryMessage = entry.getValue();
                if(retryMessage.getRetryPolicy().allowRetry())
                {
                    retryMessage.getRetryPolicy().decreaseRetryCount();
                    entry.setValue(retryMessage);
                    sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage);

                }else
                {
                    retryQueue.asMap().remove(entry.getKey());
                }
            }
        }
    }, 0, 30, TimeUnit.SECONDS);
}



public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
        return false;
    }
    if(null==retryMessage)
    {
        RetryPolicy retryPolicy = new RetryNTimes(10);
        retryMessage = new RetryMessage(retryPolicy,encodedRecords);
        retryQueue.asMap().put(address,retryMessage);
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}

public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    return sent;
}

public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
}}
like image 22
Radhey Shyam Avatar answered Oct 27 '22 07:10

Radhey Shyam