I am using below class to send data to our messaging queue by using socket either in a synchronous way or asynchronous way as shown below.
sendAsync
- It sends data asynchronously without any timeout. After sending (on LINE A)
it adds to retryHolder
bucket so that if acknowledgement is not received then it will retry again from the background thread which is started in a constructor.send
- It internally calls sendAsync
method and then sleep for a particular timeout period and if acknowledgement is not received then it removes from retryHolder
bucket so that we don't retry again.So the only difference between those two above methods is - For async I need to retry at all cost but for sync I don't need to retry but looks like it might be getting retried since we share the same retry bucket cache and retry thread runs every 1 second.
ResponsePoller
is a class which receives the acknowledgement for the data that was sent to our messaging queue and then calls removeFromretryHolder
method below to remove the address so that we don't retry after receiving the acknowledgement.
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
What is the best way by which we dont retry if anyone is calling send
method but we still need to know whether acknowledgement was received or not. Only thing is I dont need to retry at all.
Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?
The code has a number of potential issues:
retryHolder#put
.wait
+notify
instead.I would store a class with more state instead. It could contain a flag (retryIfNoAnswer
yes/no) that the retry handler could check. It could provide waitForAnswer
/markAnswerReceived
methods using wait
/notify
so that send doesn't have to sleep for a fixed time. The waitForAnswer
method can return true if an answer was obtained and false on timeout. Put the object in the retry handler before sending and use a timestamp so that only messages older than a certain age are retried. That fixes the first race condition.
EDIT: updated example code below, compiles with your code, not tested:
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final Socket _socket;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_socket = socket;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized(_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
}
catch (InterruptedException e) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized(_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
public Socket getSocket() {
return _socket;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, m.getSocket());
}
else {
// Or leave the message and let send remove it
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
}
finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
}
finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
And called from ResponsePoller
:
SendToQueue.getInstance().handleAckReceived(addressFrom);
Design-wise: I feel like you are trying to write a thread-safe and somewhat efficient NIO message sender/receiver but (both) code I see here aren't OK and won't be without significant changes. The best thing to do is either:
0MQ
framework. I see things and expectations here that are actually available out-of-the-box in ZMQ
and java.util.concurrent
API.Netty
(https://netty.io/index.html) preferably if it applies to your project. "Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients." This will save you time if your project gets complex, otherwise it might be overkill to start with (but then expect issues ...).However if you think you are almost at it with your code or @john's code then I will just give advices to complete:
wait()
and notify()
. Don't sleep()
either.You don't actually need 3 threads to process pending messages except if this processing itself is slow (or does heavy stuff) which is not the case here as you basically make an async call (as far as it is really async.. is it?).
The same for the reverse path: use an executor service (multiple threads) for your received packets processing only if the actual processing is slow/blocking or heavy.
I'm not an expert in 0MQ
at all but as far as socket.send(...)
is thread-safe and non-blocking (which I'm not sure personally - tell me) the above advices shall be correct and make things simpler.
That said, to strictly answer your question:
Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?
I'd say no, hence what do you think of the following? Based on your code and independently of my own feelings this seems acceptable:
public class SendToQueue {
// ...
private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
// ...
private void startTransaction(long address) {
this.transactions.put(address, Boolean.FALSE);
}
public void updateTransaction(long address) {
Boolean state = this.transactions.get(address);
if (state != null) {
this.transactions.put(address, Boolean.TRUE);
}
}
private void clearTransaction(long address) {
this.transactions.remove(address);
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean success = false;
// If address is enough randomized or atomically counted (then ok for parallel send())
startTransaction(address);
try {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
// wait for acknowledgement
success = waitDoneUntil(new DoneCondition() {
@Override
public boolean isDone() {
return SendToQueue.this.transactions.get(address); // no NPE
}
}, 500, TimeUnit.MILLISECONDS);
if (success) {
// Message acknowledged!
}
}
} finally {
clearTransaction(address);
}
return success;
}
public static interface DoneCondition {
public boolean isDone();
}
/**
* WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
* sleep(50).
*
* @param f Will block for this future done until maxWaitMillis
* @param waitTime Duration expressed in (time) unit.
* @param unit Time unit.
* @return DoneCondition finally met or not
*/
public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
long curMillis = 0;
long maxWaitMillis = unit.toMillis(waitTime);
while (!f.isDone() && curMillis < maxWaitMillis) {
try {
Thread.sleep(50); // define your step here accordingly or set as parameter
} catch (InterruptedException ex1) {
//logger.debug("waitDoneUntil() interrupted.");
break;
}
curMillis += 50L;
}
return f.isDone();
}
//...
}
public class ResponsePoller {
//...
public void onReceive(long address) { // sample prototype
// ...
SendToQueue.getInstance().updateTransaction(address);
// The interested sender will know that its transaction is complete.
// While subsequent (late) calls will have no effect.
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With