Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reconnecting to IBM MQ Queue on connection failure

Tags:

c#

ibm-mq

xms

Following code snippet has my connection and subscription logic for an IBM MQ Queue. When ever there is a connection failure, I am using IConnection.ExceptionListener delegate to establish a new connection to by queue and resubscribing for the messages. But the problem is, I can see multiple queue handles. How can I make sure I close the previous connection handle and establish a new connection where ever there is a connection break due to network issues or MQ server restarts?

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;

private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    //Connection
    _connection = _connectionfactory.CreateConnection (null, null);
    _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

    //Session
    _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

    //Destination
    _destination = _session.CreateQueue ("queue://My.Queue.Name");
    _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
    _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

    //Consumer
    _consumer = _session.CreateConsumer (_destination);
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            //Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
            //_conection.Stop()
            //_conection.Close()
            CreateWebsphereQueueConnection ();
            Subscribe (onMessageReceived);
        };

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}
like image 564
PushCode Avatar asked Oct 11 '25 09:10

PushCode


2 Answers

You have already set reconnection option on the connection factory. XMS library will do the reconnection automatically when the connection to queue manager breaks, excepting when the queue manager is shutdown without -r or -s option. So your application does not need to reconnect explicitly. Having exception listener will help in understanding what's going with reconnection process.

like image 142
Shashi Avatar answered Oct 13 '25 23:10

Shashi


IBM.XMS.dll will take care of MQ fail over or restarts done with -r switch. But, if there was a restart wit out asking the connected clients to reconnect, XMS library will not attempt to reconnect and the costumers will have to handle this situation manually as pointed out by @Shashi and @JoshMc.

I had to handle this situation and changing my Connection ExceptionListener as follows helped me:

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private bool _reConnectOnConnectionBreak = false;
private bool _connected = false;
private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    while (!_connected || _reConnectOnConnectionBreak) {
        try {
            //Connection
            _connection = _connectionfactory.CreateConnection (null, null);
            _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

            //Session
            _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

            //Destination
            _destination = _session.CreateQueue ("queue://My.Queue.Name");
            _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
            _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

            //Consumer
            _consumer = _session.CreateConsumer (_destination);
            _connected = true;
        } catch (Exception ex) {
            _connected = false;
        }

    }
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            XMSException xmsError = (XMSException) connectionException;
            int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
            if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
                _reConnectOnConnectionBreak = true;
                _connection.Close ();

                CreateWebsphereQueueConnection ();
                Subscribe (onMessageReceived);
                _reConnectOnConnectionBreak = false;
            }
        }

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

There is no better way to check the state of the connection IConnection in IBM MQ version 8. So, I had to use thereason codes. In IBM MQ version 9, we can use the rest API exposed by the server to check the connection state.

like image 36
PushCode Avatar answered Oct 13 '25 23:10

PushCode