Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ throwing TimeOut exception on .CreateModel()

I've created a Bus application to encapsulate all the subscribe and publish to a RabbitMQ broker. This bus is then used as a SDK for others projects of mine. After running this projects (whitch is basicaclly for applications comunicating via RabbitMQ messages using my Bus sdk), I realized that the Bus was throwing a exception a TimeOut exception on the native method .CreateModel(), as follows:

System.TimeoutException: The operation has timed out.
   at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()

Here are some information about the my RabbitMQ broker: Version: 3.7.2 Erlang: 20.1 OS: Linux Ubuntu (AWS)

Here is my RabbitMQConnection class:

internal class RabbitMQConnection : IDisposable
{
    private object __syncRoot__ { get; } = new object();

    #region Private
    private bool __disposed__ { get; set; }
    private IConnectionFactory __rmqConnFactory__ { get; }
    private IConnection __rmqConnection__ { get; set; }
    private string __connectionIdentifier__ { get; }
    private ILog __logger__ { get; }
    #endregion

    #region Public
    public bool IsConnected
    {
        get
        {
            DisposeCheck();
            return __rmqConnection__ != null && __rmqConnection__.IsOpen && !__disposed__;
        }
    }

    public int ThreadLimit { get; private set; }

    public IConnection GetConnection
    {
        get
        {
            DisposeCheck();
            if (IsConnected == true)
            {
                return __rmqConnection__;
            }
            else
            {
                return null;
            }
        }
    }

    public bool IsDisposed
    {
        get
        {
            return __disposed__;
        }
    }
    #endregion

    public RabbitMQConnection(RabbitMQConfiguration _rmqConfig)
    {
        __logger__ = LogProvider.GetCurrentClassLogger();

        ConnectionFactory rMQconnFactory = new ConnectionFactory()
        {
            HostName = _rmqConfig.Hostname,
            UserName = _rmqConfig.Username,
            Password = _rmqConfig.Password,
            VirtualHost = _rmqConfig.VirtualHost,
            AutomaticRecoveryEnabled = true,
            DispatchConsumersAsync = true,
        };

        __rmqConnFactory__ = rMQconnFactory;

        ThreadLimit = _rmqConfig.ThreadLimit;

        __connectionIdentifier__ = $"{_rmqConfig.CurrentMachineIPAddress} - {_rmqConfig.CurrentMachineHostname}";

        Connect();

        __disposed__ = false;
    }

    private void Connect()
    {
        DisposeCheck();

        if (__rmqConnection__ == null)
        {
            lock (__syncRoot__)
            {
                if (__rmqConnection__ == null)
                {
                    __rmqConnection__ = __rmqConnFactory__.CreateConnection(__connectionIdentifier__);
                }
            }
        }
    }

    public IModel CreateChannel()
    {

        DisposeCheck();

        if (!IsConnected)
        {
            throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
        }

        return __rmqConnection__.CreateModel(); // Here is where the exception occurs!!!
    }

    #region ShutdownEvents
    private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
    {
        if (__disposed__) return;

        //Logar algo
        __logger__.Warn("A RabbitMQ connection (OnConnectionBlocked) is shutdown. Trying to re-connect...");
    }

    void OnCallbackException(object sender, CallbackExceptionEventArgs e)
    {
        if (__disposed__) return;

        //Logar algo
        __logger__.Warn(e?.Exception, "A RabbitMQ connection (OnCallbackException) is shutdown. Trying to re-connect...");
    }

    void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
    {
        if (__disposed__) return;

        //Logar algo
        __logger__.Warn("A RabbitMQ connection (OnConnectionShutdown) is on shutdown. Trying to re-connect...");
    }
    #endregion

    public void Dispose()
    {
        if (__disposed__ == true)
        {
            return;
        }

        try
        {
            __rmqConnection__?.Dispose();
        }
        catch (Exception ex)
        {
            //Log here
            __logger__.Fatal(ex, "RabbitMQ Connection: {0}", ex.Message);
        }
        finally
        {
            __disposed__ = true;
        }
    }

    private void DisposeCheck()
    {
        if (__disposed__ == true)
        {
            throw new ObjectDisposedException("RabbitMQConnection");
        }
    }
}

The exception occurs in the line:

return __rmqConnection__.CreateModel();

Does anyone has any ideia of why that happens? I know that sometimes the RabbitMQ connection may oscillate, but I've heard that the RabbitMQ has a native retry to when that happens.

like image 786
Gui Oliveira Avatar asked Jan 20 '19 17:01

Gui Oliveira


2 Answers

I don't know how much relevant its now. Exception is {System.TimeoutException} The operation has timed out. If we remove AutomaticRecoveryEnabled = true in connection factory, not generating exception and channel is created.

like image 97
kishorebarik Avatar answered Nov 20 '22 08:11

kishorebarik


For what it's worth; I've had this error when there were too many channels already created on the IConnection. The error didn't indicate this unfortunately, but scaling down the number of channels stopped the error from happening.

The max number of channels can be set both in the client and server.

like image 44
Aage Avatar answered Nov 20 '22 10:11

Aage