I am using below class to send data to our messaging queue by using socket either synchronously or asynchronously as shown below. It depends on requirement whether I want to call synchronous or asynchronous method to send data on a socket. Most of the times we will send data asynchronously but sometimes I may need to send data synchronously.
sendAsync
- It sends data asynchronously and we don't block the thread which is sending data. If acknowledgment is not received then it will retry again from the background thread which is started in SendToQueue
constructor only.send
- It sends data synchronously on a socket. It internally calls doSendAsync
method and then sleep for a particular timeout period and if acknowledgment is not received then it removes from cache
bucket so that we don't retry again.So the only difference between those two above methods is - For async case, I need to retry at all cost if acknowledgment is not received but for sync I don't need to retry at all and that's why I am storing more state in a PendingMessage
class.
ResponsePoller
is a class which receives the acknowledgment for the data that was sent to our messaging queue on a particular socket and then calls handleAckReceived
method below to remove the address so that we don't retry after receiving the acknowledgment. If acknowledgment is received then socket is live otherwise it is dead.
public class SendToQueue {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized (_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
} catch (InterruptedException ie) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized (_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, Optional.<Socket>absent());
} else {
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgment
// and then delete entry from the cache
// accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m, Optional.<Socket>absent());
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Optional<Socket> socket) {
Optional<Socket> actualSocket = socket;
if (!actualSocket.isPresent()) {
SocketHolder liveSocket = SocketManager.getInstance().getSocket();
actualSocket = Optional.of(liveSocket.getSocket());
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
return msg.send(actualSocket.get());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords) {
return send(address, encodedRecords, Optional.<Socket>absent());
}
public boolean send(final long address, final byte[] encodedRecords,
final Optional<Socket> socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by acknowledgment thread which is in "ResponsePoller" class
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
As I am sending data on a socket and if I get the acknowledgment back for the same data then it means Socket is alive but if data is not acknowledge back then it means socket is dead (but I will keep retrying to send the data).
So with my above design (or if there is any better way), how can I figure out whether any socket is dead or live because either acknowledgment was not received or it was received from that socket and basis on that I need to release the socket back into its pool (whether it is alive or dead) by calling below method depending on whether acknowledgment is received or not either for sync or async case.
I also need to configure count that if acknowledgment is not received on a particular socket for x (where x is a number > 0, default should be 2) times, then only mark a socket dead. What is the best and efficient way to do this thing?
SocketManager.getInstance().releaseSocket(socket, SocketState.LIVE);
SocketManager.getInstance().releaseSocket(socket, SocketState.DEAD);
For TCP, the default keep-alive timeout is 2 hours and the keep-alive interval is 1 second. The default number of keep-alive probes varies based on the version of Windows. The SIO_KEEPALIVE_VALS control code can be used to enable or disable keep-alive, and adjust the timeout and interval, for a single connection.
New Thread creation: When a program calls the start() method, a new thread is created and then the run() method is executed.
Say you are at home, you have cable plugged to your laptop, and router, and behind router there is cable modem. If you turn off your router - your laptop will know that - no voltage. If you turn off your modem... that goes tricky. You simply can't know that. One potential problem is no route to host. But even if you are connected, it can be any other issue. Some protocols - like ssh have ping build in - so they have keep-alive for connection. If your app is doing nothing every interval there is ping-pong between client and server, so you know if that is alive.
If you have full control on the protocol - keep-alive is one of the options.
Your client is at one end, but in general it is really hard to have two parties be sure that they have agreement. Byzantine generals problem describes something that is general network model, where each node is not aware about any other, and can trust only what is aware of.
In general I would not write distributed system myself. I'm using Hystrix for that. Link is to their configuration, so you can see how big it is. You can track if the server is working, or not. Also when it is back again you can prepare policy to figure it out, not to flood it, cancel messages that are outdated, and many more - graphs, stats, integration with other solutions. There is big community using it, and solving problems. That is much better option then doing it yourself.
Not sure if you have only this one service to talk with, or if Hystrix is for you good way. In Java people tend to use layers, frameworks if they deals with problems... Hope it help.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With