Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.ArrayIndexOutOfBoundsException: 256 with jeromq 0.3.6 version

I am using Jeromq in multithreaded environment as shown below. Below is my code in which constructor of SocketManager connects to all the available sockets first and I put them in liveSocketsByDatacenter map in the connectToZMQSockets method. After that I start a background thread in the same constructor which runs every 30 seconds and it calls updateLiveSockets method to ping all those socket which were already there in liveSocketsByDatacenter map and update the liveSocketsByDatacenter map with whether those sockets were alive or not.

And getNextSocket() method is called by multiple reader threads concurrently to get the next live available socket and then we use that socket to send the data on it. So my question is are we using Jeromq correctly in multithreaded environment? Because we just saw an exception in our production environment with this stacktrace while we were trying to send data to that live socket so I am not sure whether it's a bug or something else?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

Below is my code:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private static final SocketManager instance = new SocketManager();
    }

    public static SocketManager getInstance() {
        return Holder.instance;
    }

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
      }
    }

    private List<SocketHolder> connect(List<String> addresses, int socketType) {
        List<SocketHolder> socketList = new ArrayList<>();
        for (String address : addresses) {
          try {
            Socket client = ctx.createSocket(socketType);
            // Set random identity to make tracing easier
            String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
            client.setIdentity(identity.getBytes(ZMQ.CHARSET));
            client.setTCPKeepAlive(1);
            client.setSendTimeOut(7);
            client.setLinger(0);
            client.connect(address);

            SocketHolder zmq = new SocketHolder(client, ctx, address, true);
            socketList.add(zmq);
          } catch (Exception ex) {
            // log error
          }
        }
        return socketList;
    }

    // this method will be called by multiple threads concurrently to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // runs every 30 seconds to ping all the socket to make sure whether they are alive or not
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          // pinging to see whether a socket is live or not
          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
      }
    }
}

And here is how I am using getNextSocket() method of SocketManager class concurrently from multiple reader threads:

// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
  Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
  return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}

public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
    final boolean messageA) {
  ZMsg msg = new ZMsg();
  msg.add(reco);
  boolean sent = msg.send(socket);
  msg.destroy();
  retryHolder.put(addr, reco);
  return sent;
}

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket, true);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // ...
    return sent;
  } 

I don't think this is correct I believe. It seems getNextSocket() could return a 0MQ socket to thread A. Concurrently, the timer thread may access the same 0MQ socket to ping it. In this case thread A and the timer thread are mutating the same 0MQ socket, which will lead to problems. So what is the best and efficient way to fix this issue?

Note: SocketHolder is an immutable class

Update:

I just noticed same issue happened on my another box with same ArrayIndexOutOfBoundsException but this time its 71 line number in "YQueue" file. The only consistent thing is 256 always. So there should be something related to 256 for sure and I am not able to figure out what is this 256 here?

java.lang.ArrayIndexOutOfBoundsException: 256
    at zmq.YQueue.backPos(YQueue.java:71)
    at zmq.YPipe.write(YPipe.java:51)
    at zmq.Pipe.write(Pipe.java:232)
    at zmq.LB.send(LB.java:83)
    at zmq.Push.xsend(Push.java:48)
    at zmq.SocketBase.send(SocketBase.java:590)
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
    at org.zeromq.ZFrame.send(ZFrame.java:131)
    at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
    at org.zeromq.ZMsg.send(ZMsg.java:191)
    at org.zeromq.ZMsg.send(ZMsg.java:163)
like image 535
john Avatar asked Nov 02 '17 21:11

john


1 Answers

Fact #0: ZeroMQ is not thread-safe -- by definition

While ZeroMQ documentation and Pieter HINTJENS' excellent book "Code Connected. Volume 1" do not forget to remind this fact wherever possible, the idea of returning or even sharing a ZeroMQ socket-instance among threads appear from time to time. Sure, class-instances' methods may deliver this almost "hidden" inside theirs internal methods and attributes, but proper design efforts ought prevent any such side-effects with no exceptions, no excuse.

Sharing, if reasonably supported by quantitative facts, may be a way for a common instance of the zmq.Context(), but a crystal-clear distributed system design may live on a truly multi-agent scheme, where each agent operates its own Context()-engine, fine-tuned to the respective mix of configuration and performance preferences.

So what is the best and efficient way to fix this issue?

Never share a ZeroMQ socket. Never, indeed. Even if the newest development started to promise some near future changes in this direction. It is a bad habit to pollute any high-performance, low-latency distributed system design with sharing. Share nothing is the best design principle for this domain.


Yeah I can see we should not share sockets between threads but in my code
what do you think is the best way to resolve this?

Yeah, the best and efficient way to fix this issue is to never share a ZeroMQ socket.

This means never return any object, attributes of which are ZeroMQ sockets ( which you actively build and return in a massive manner from the .connect(){...} class-method. In your case, all the class-methods seem to be kept private, which may fuse the problem of allowing "other threads" to touch the class-private socket instances, but the same principle must be endorsed also on all the attribute-level, so as to be effective. Finally, this "fusing" gets shortcut and violated by the
public static SocketManager getInstance(),
which promiscuitively offers any external asker to get a straight access to sharing the class-private instances of the ZeroMQ sockets.

If some documentation explicitly warns in almost every chapter not to share things, one rather should not share the things.

So, re-design the methods, so that the SocketManager gets more functionalities as it's class-methods, which will execute the must-have functionalities embedded, so as to explicitly prevent any external-world thread to touch a non-share-able instance, as documented in ZeroMQ publications.

Next comes the inventory of resources: your code seems to re-check every 30 seconds the state-of-the-world in all DataCenters-of-Interest. This actually creates new List objects twice a minute. While you may speculatively let java Garbage Collector to tidy up all the thrash, that is not further referenced from anywhere, this is not a good idea for ZeroMQ-related objects, embedded inside List-s from your previous re-check runs. ZeroMQ-objects are still referenced from inside the Zcontext() - the ZeroMQ Context()-core-factory instantiated I/O-thread(s), which could be also viewed as the ZeroMQ socket-inventory resources-manager. So, all the new-created socket-instances get not only the external-handle from the java-side, but also an internal-handle, from inside the (Z)Context(). So far so good. But what is not seen, anywhere in the code, is any method, that would de-commission any and all the ZeroMQ sockets in object-instances, that have got deassociated from java-side, but yet remain referenced from the (Z)Context()-side. Explicit resource-decommissioning of allocated resources is a fair design-side practice, the more for resources, that are limited or otherwise constrained. The way how to do this may differ for { "cheap" | "expensive" }-maintenance costs of such resources-management processing ( ZeroMQ socket-instances being remarkably expensive to get handled as some lightweight "consumable/disposable" ... but that is another story ).

So, add also a set of proper resources-re-use / resources-dismantling methods, that would get the total amount of new-created sockets back under your responsibility of control ( your code is responsible for how many socket-handlers inside the (Z)Context()-domain-of-resources-control may get created and must remain to have been managed -- be it knowingly or not ).

One may object there might be some "promises" from automated detection and ( potentially well deferred ) garbage collection, but still, your code is responsible for proper resources-management and even LMAX guys would never get such brave performance, if they were relying on "promises" from standard gc. Your problem is way worse than LMAX top-performance had to fight with. Your code ( so far published ) does nothing to .close() and .term() the ZeroMQ-associated resources at all. This is a straight impossible practice inside an ecosystem with uncontrolled-(distributed-demand-for)-consumption. You have to protect your boat from getting overloaded beyond a limit you know it can safely handle and dynamically unload each and every box, that has no recipient on the "opposite coast".

That is the Captain's ( your code designer's ) responsibility.

Not telling explicitly the sailor-in-charge of the inventory-management on the lowest level ( ZeroMQ Context()-floor ) that some boxes are to get un-loaded, the problem is still yours. The standard gc-chain-of-command will not do this "automatically", whatever "promises" might look like it would, it would not. So be explicit towards your ZeroMQ resources-management, evaluate return-codes from ordering these steps to be taken and handle appropriately any and all exceptions raised from doing these resources-management operations under your code explicit control.

Lower ( if not the lowest achievable at all ) resources utilisation-envelopes and higher ( if not the highest achievable at all ) performance is a bonus from doing this job right. LMAX guys are a good example in doing this remarkably well beyond the standard java "promises", so one can learn from the bests of the bests.


Call signatures declared, vs. used, do not seem to match:
while I may be wrong in this point, as most of my design efforts are not in java polymorphic call-interfaces, there seems to be a mis-match in a signature, published as:

private List<SocketHolder> connect( Datacenters  dc,                     // 1-st
                                    List<String> addresses,              // 2-nd
                                    int          socketType              // 3-rd
                                    ) {
        ... /* implementation */
}

and
the actual method invocation,
called inside connectToZMQSockets() method just by:

        List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
                                                       ZMQ.PUSH          // 2-nd
                                                       );
like image 187
user3666197 Avatar answered Oct 07 '22 01:10

user3666197