I'm trying to understand how to deal with threads within a Java client that connects to HornetQ. I'm not getting a specific error but fail to understand how I'm expected to deal with threads in the first place (with respect to the HornetQ client and specifically MessageHandler.onMessage()
-- threads in general are no problem to me).
In case this is relevant: I'm using 'org.hornetq:hornetq-server:2.4.7.Final'
to run the server embedded into my application. I don't intend this to make a difference. In my situation, that's just more convenient from an ops perspective than running a standalone server process.
What I did so far:
create an embedded server: new EmbeddedHornetQ(),
.setConfiguration()
create a server locator: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))
serverLocator.createSessionFactory()
Now it seems obvious to me that I can create a session using hornetqClientSessionFactory.createSession()
, create a producer and consumer for that session, and deal with messages within a single thread using .send()
and .receive()
.
But I also discovered consumer.setMessageHandler()
, and this tells me that I didn't understand threading in the client at all. I tried to use it, but then the consumer calls messageHandler.onMessage()
in two threads that are distinct from the one that created the session. This seems to match my impression from looking at the code -- the HornetQ client uses a thread pool to dispatch messages.
This leaves me confused. The javadocs say that the session is a "single-thread object"
, and the code agrees -- no obvious synchronization going on there. But with onMessage()
being called in multiple threads, message.acknowledge()
is also called in multiple threads, and that one just delegates to the session.
How is this supposed to work? How would a scenario look in which MessageHandler does NOT access the session from multiple threads?
Going further, how would I send follow-up messages from within onMessage()? I'm using HornetQ for a persistent "to-do" work queue, so sending follow-up messages is a typical use case for me. But again, within onMessage()
, I'm in the wrong thread for accessing the session.
Note that I would be okay with staying away from MessageHandler and just using send() / receive()
in a way that allows me to control threading. But I'm convinced that I don't understand the whole situation at all, and that combined with multi-threading is just asking for trouble.
I can answer part of your question, although I hope you've already fixed the issue by now.
Form the HornetQ documentation on ClientConsumer (Emphasis mine):
A ClientConsumer receives messages from HornetQ queues.
Messages can be consumed synchronously by using the receive() methods which will block until a message is received (or a timeout expires) or asynchronously by setting a MessageHandler.
These 2 types of consumption are exclusive: a ClientConsumer with a MessageHandler set will throw HornetQException if its receive() methods are called.
So you have two choices on handling message reception:
.receive()
or .receive(long itmeout)
at your leisureClientMessage
object returned by the call
Session
you hopefully carry in the Consumer you can forward the message as you see fit.receive()
on a Consumer onMessage(ClientMessage)
Session
from this object, as it is not exposed by the interface.QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
on it to obtain a new Session and send your messages from it... This is probably alright as far as I can see because the Session object is not really re-created. I also did this because Sessions had a tendency to become stale.I don't think you should want so much to be in control of your Message execution threads, especially transient Threads that merely forward messages. HornetQ has built-in Thread pools as you guessed, and reuses these objects efficiently.
Also as you know you don't need to be in a single Thread to access an object (like a Queue) so it doesn't matter if the Queue is accessed through multiple Threads, or even through multiple Sessions. You need only make sure a Session is only accesed by one Thread, and this is by design with MessageListener.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With