Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send record and wait for its acknowledgement to receive

I am using below class to send data to our messaging queue by using socket either in a synchronous way or asynchronous way as shown below.

  • sendAsync - It sends data asynchronously without any timeout. After sending (on LINE A) it adds to retryHolder bucket so that if acknowledgement is not received then it will retry again from the background thread which is started in a constructor.
  • send - It internally calls sendAsync method and then sleep for a particular timeout period and if acknowledgement is not received then it removes from retryHolder bucket so that we don't retry again.

So the only difference between those two above methods is - For async I need to retry at all cost but for sync I don't need to retry but looks like it might be getting retried since we share the same retry bucket cache and retry thread runs every 1 second.

ResponsePoller is a class which receives the acknowledgement for the data that was sent to our messaging queue and then calls removeFromretryHolder method below to remove the address so that we don't retry after receiving the acknowledgement.

public class SendToQueue {
  private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
  private final Cache<Long, byte[]> retryHolder =
      CacheBuilder
          .newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .removalListener(
              RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        // retry again
        for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
          sendAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedRecords);
    // send data on a socket LINE A
    boolean sent = msg.send(socket);
    msg.destroy();
    retryHolder.put(address, encodedRecords);
    return sent;
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !retryHolder.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromretryHolder(address);
    return sent;
  }

  public void removeFromretryHolder(final long address) {
    retryHolder.invalidate(address);
  }
}

What is the best way by which we dont retry if anyone is calling send method but we still need to know whether acknowledgement was received or not. Only thing is I dont need to retry at all.

Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?

like image 757
john Avatar asked Nov 04 '17 17:11

john


2 Answers

The code has a number of potential issues:

  • An answer may be received before the call to retryHolder#put.
  • Possibly there is a race condition when messages are retried too.
  • If two messages are sent to the same address the second overwrites the first?
  • Send always wastes time with a sleep, use a wait+notify instead.

I would store a class with more state instead. It could contain a flag (retryIfNoAnswer yes/no) that the retry handler could check. It could provide waitForAnswer/markAnswerReceived methods using wait/notify so that send doesn't have to sleep for a fixed time. The waitForAnswer method can return true if an answer was obtained and false on timeout. Put the object in the retry handler before sending and use a timestamp so that only messages older than a certain age are retried. That fixes the first race condition.

EDIT: updated example code below, compiles with your code, not tested:

public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
    .concurrencyLevel(100)
    .removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final Socket _socket;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
        _address = address;
        _sendTimeMillis = System.currentTimeMillis();
        _encodedRecords = encodedRecords;
        _socket = socket;
        _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
        return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
        _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
        return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
        try {
            synchronized(_monitor) {
                _monitor.wait(500L);
            }
            return _acknowledged;
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void ackReceived() {
        _acknowledged = true;
        synchronized(_monitor) {
            _monitor.notifyAll();
        }
    }

    public long getAddress() {
        return _address;
    }

    public byte[] getEncodedRecords() {
        return _encodedRecords;
    }

    public Socket getSocket() {
        return _socket;
    }
}

private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
}

public static SendToQueue getInstance() {
    return Holder.INSTANCE;
}

private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
        if (m.hasExpired()) {
            if (m.shouldRetry()) {
                m.markResent();
                doSendAsync(m, m.getSocket());
            }
            else {
                // Or leave the message and let send remove it
                cache.invalidate(m.getAddress());
            }
        }
    }
}

private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            handleRetries();
        }
    }, 0, 1, TimeUnit.SECONDS);
}

public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
    cache.put(address, m);
    return doSendAsync(m, socket);
}

private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
        // send data on a socket LINE A
        return msg.send(socket);
    }
    finally {
        msg.destroy();
    }
}

public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
        if (doSendAsync(m, socket)) {
            return m.waitForAck();
        }
        return false;
    }
    finally {
        // Alternatively (checks that address points to m):
        // cache.asMap().remove(address, m);
        cache.invalidate(address);
    }
}

public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
        m.ackReceived();
        cache.invalidate(address);
    }
}
}

And called from ResponsePoller:

SendToQueue.getInstance().handleAckReceived(addressFrom);
like image 181
ewramner Avatar answered Sep 23 '22 03:09

ewramner


Design-wise: I feel like you are trying to write a thread-safe and somewhat efficient NIO message sender/receiver but (both) code I see here aren't OK and won't be without significant changes. The best thing to do is either:

  • make full use of the 0MQ framework. I see things and expectations here that are actually available out-of-the-box in ZMQ and java.util.concurrent API.
  • or have a look at Netty (https://netty.io/index.html) preferably if it applies to your project. "Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients." This will save you time if your project gets complex, otherwise it might be overkill to start with (but then expect issues ...).

However if you think you are almost at it with your code or @john's code then I will just give advices to complete:

  • don't use wait() and notify(). Don't sleep() either.
  • use a single thread for your "flow tracker" (i.e. ~the pending message Cache).

You don't actually need 3 threads to process pending messages except if this processing itself is slow (or does heavy stuff) which is not the case here as you basically make an async call (as far as it is really async.. is it?).

The same for the reverse path: use an executor service (multiple threads) for your received packets processing only if the actual processing is slow/blocking or heavy.

I'm not an expert in 0MQ at all but as far as socket.send(...) is thread-safe and non-blocking (which I'm not sure personally - tell me) the above advices shall be correct and make things simpler.

That said, to strictly answer your question:

Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?

I'd say no, hence what do you think of the following? Based on your code and independently of my own feelings this seems acceptable:

public class SendToQueue {
    // ...
    private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
    // ...

    private void startTransaction(long address) {
        this.transactions.put(address, Boolean.FALSE);
    }

    public void updateTransaction(long address) {
        Boolean state = this.transactions.get(address);
        if (state != null) {    
            this.transactions.put(address, Boolean.TRUE);
        }
    }

    private void clearTransaction(long address) {
        this.transactions.remove(address);
    }

    public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
        boolean success = false;

        // If address is enough randomized or atomically counted (then ok for parallel send())
        startTransaction(address);
        try {
            boolean sent = sendAsync(address, encodedRecords, socket);
            // if the record was sent successfully, then only sleep for timeout period
            if (sent) {
                // wait for acknowledgement
                success = waitDoneUntil(new DoneCondition() {
                    @Override
                    public boolean isDone() {
                        return SendToQueue.this.transactions.get(address); // no NPE
                    }
                }, 500, TimeUnit.MILLISECONDS);

                if (success) {
                    // Message acknowledged!
                }
            }
        } finally {
            clearTransaction(address);
        }
        return success;
    }

    public static interface DoneCondition {
        public boolean isDone();
    }

    /**
     * WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
     * sleep(50).
     *
     * @param f Will block for this future done until maxWaitMillis
     * @param waitTime Duration expressed in (time) unit.
     * @param unit Time unit.
     * @return DoneCondition finally met or not
     */
    public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
        long curMillis = 0;
        long maxWaitMillis = unit.toMillis(waitTime);

        while (!f.isDone() && curMillis < maxWaitMillis) {
            try {
                Thread.sleep(50);   // define your step here accordingly or set as parameter
            } catch (InterruptedException ex1) {
                //logger.debug("waitDoneUntil() interrupted.");
                break;
            }
            curMillis += 50L;
        }

        return f.isDone();
    }
    //...
}

public class ResponsePoller {
    //...

    public void onReceive(long address) {   // sample prototype     
        // ...
        SendToQueue.getInstance().updateTransaction(address);
        // The interested sender will know that its transaction is complete.
        // While subsequent (late) calls will have no effect.
    }
}
like image 33
bsaverino Avatar answered Sep 23 '22 03:09

bsaverino