Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread handling in Java HornetQ client

I'm trying to understand how to deal with threads within a Java client that connects to HornetQ. I'm not getting a specific error but fail to understand how I'm expected to deal with threads in the first place (with respect to the HornetQ client and specifically MessageHandler.onMessage() -- threads in general are no problem to me).

In case this is relevant: I'm using 'org.hornetq:hornetq-server:2.4.7.Final' to run the server embedded into my application. I don't intend this to make a difference. In my situation, that's just more convenient from an ops perspective than running a standalone server process.

What I did so far:

  1. create an embedded server: new EmbeddedHornetQ(), .setConfiguration()

  2. create a server locator: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. create a session factory: serverLocator.createSessionFactory()

Now it seems obvious to me that I can create a session using hornetqClientSessionFactory.createSession(), create a producer and consumer for that session, and deal with messages within a single thread using .send() and .receive().

But I also discovered consumer.setMessageHandler(), and this tells me that I didn't understand threading in the client at all. I tried to use it, but then the consumer calls messageHandler.onMessage() in two threads that are distinct from the one that created the session. This seems to match my impression from looking at the code -- the HornetQ client uses a thread pool to dispatch messages.

This leaves me confused. The javadocs say that the session is a "single-thread object", and the code agrees -- no obvious synchronization going on there. But with onMessage() being called in multiple threads, message.acknowledge() is also called in multiple threads, and that one just delegates to the session. How is this supposed to work? How would a scenario look in which MessageHandler does NOT access the session from multiple threads?

Going further, how would I send follow-up messages from within onMessage()? I'm using HornetQ for a persistent "to-do" work queue, so sending follow-up messages is a typical use case for me. But again, within onMessage(), I'm in the wrong thread for accessing the session.

Note that I would be okay with staying away from MessageHandler and just using send() / receive() in a way that allows me to control threading. But I'm convinced that I don't understand the whole situation at all, and that combined with multi-threading is just asking for trouble.

like image 703
Martin Geisse Avatar asked Jun 06 '16 07:06

Martin Geisse


1 Answers

I can answer part of your question, although I hope you've already fixed the issue by now.

Form the HornetQ documentation on ClientConsumer (Emphasis mine):

A ClientConsumer receives messages from HornetQ queues.
Messages can be consumed synchronously by using the receive() methods which will block until a message is received (or a timeout expires) or asynchronously by setting a MessageHandler.
These 2 types of consumption are exclusive: a ClientConsumer with a MessageHandler set will throw HornetQException if its receive() methods are called.

So you have two choices on handling message reception:

  1. Synchronize the reception yourself
    • Do not provide a MessageListener to HornetQ
    • In your own cunsumer Thread, invoke .receive() or .receive(long itmeout) at your leisure
    • Retrieve the (optional) ClientMessage object returned by the call
      • Pro: Using the Session you hopefully carry in the Consumer you can forward the message as you see fit
      • Con: All this message handling will be sequential
  2. Delegate Thread synchronization to HornetQ
    • Do not invoke .receive() on a Consumer
    • Provide a MessageListener implementation of onMessage(ClientMessage)
      • Pro: All the message handling will be concurrent and fast, hassle-free
      • Con: I do not think it possible to retrieve the Session from this object, as it is not exposed by the interface.
    • Untested workaround: In my application (which is in-vm like yours), I exposed the underlying, thread-safe QueueConnection as a static variable available application-wide. From your MessageListener, you may invoke QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); on it to obtain a new Session and send your messages from it... This is probably alright as far as I can see because the Session object is not really re-created. I also did this because Sessions had a tendency to become stale.

I don't think you should want so much to be in control of your Message execution threads, especially transient Threads that merely forward messages. HornetQ has built-in Thread pools as you guessed, and reuses these objects efficiently.

Also as you know you don't need to be in a single Thread to access an object (like a Queue) so it doesn't matter if the Queue is accessed through multiple Threads, or even through multiple Sessions. You need only make sure a Session is only accesed by one Thread, and this is by design with MessageListener.

like image 79
MrBrushy Avatar answered Nov 08 '22 12:11

MrBrushy