Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ZeroMQ PUB/SUB Pattern with Multi-Threaded Poller Cancellation

I have two applications, a C++ server, and a C# WPF UI. The C++ code takes requests (from anywhere/anyone) via a ZeroMQ messaging [PUB/SUB] service. I use my C# code for back testing and to create "back tests" and execute them. These back tests can be made up of many "unit tests" and each of these sending/receiving thousands of messages from the C++ server.

Currently individual back tests work well can send off N unit tests each with thousands of requests and captures. My problem is architecture; when I dispatch another back test (following the first) I get a problem with event subscription being done a second time due to the polling thread not being cancelled and disposed. This results in erroneous output. This may seem like a trivial problem (perhaps it is for some of you), but the cancellation of this polling Task under my current configuration is proving troublesome. Some code...

My message broker class is simple and looks like

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

The backtesting "engine" use to execute each back test, first constructs a Dictionary containing each Test (unit test) and the messages to dispatch to the C++ application for each test.

The DispatchTests method, here it is

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

The PING message at the end, it to tell the C++ that we are finished. We then force a wait, so that the next [unit] test is not dispatched before all of the returns are received from the C++ code - we do this using a ManualResetEvent.

When the C++ receives the PING message, it sends the message straight back. We handle the received messages via OnMessageRecieved and the PING tells us to set the ManualResetEvent.Set() so that we can continue the unit testing; "Next Please"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

My problem is that, broker.Dispose() in the finally above is never hit. I appreciate that finally blocks that are executed on background threads are not guaranteed to get executed.

The crossed out text above was due to me messing about with the code; I was stopping a parent thread before the child had completed. However, there are still problems...

Now broker.Dispose() is called correctly, and broker.Dispose() is called, in this method I attempt to cancell the poller thread and dispose of the Task correctly to avoid any multiple subscriptions.

To cancel the thread I use the CancelPolling() method

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

but in the StartPolling() method

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() is never called and the thread is never cancelled, thus never properly disposed. The poller thread is being blocked by the subSocket.Receive() method.

Now, it is not clear to me how to achieve what I want, I need to invoke the broker.Dispose()/PollerCancel() on a thread other than that used to poll for messages and some how force the cancellation. Thread abort is not what I want to get into at any cost.

Essentially, I want to properly dispose of the broker before executing the next back test, how do I correctly handle this, split out the polling and run it in a separate Application Domain?

I have tried, disposing inside the OnMessageRecived handler, but this is clearly executed on the same thread as the poller and is not the way to do this, without invoking additional threads, it blocks.

What is the best way to achieve what I want and is there a pattern for this sort of case that I can follow?

Thanks for your time.

like image 966
MoonKnight Avatar asked Apr 30 '15 20:04

MoonKnight


2 Answers

This is how I eventually got around this [although I am open to a better solution!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

So we poll in the same way, but using the Poller class from NetMQ. In the Task continuation we set so we are sure that both the Poller and Task are cancelled. We are then safe to dispose...

like image 154
MoonKnight Avatar answered Oct 23 '22 08:10

MoonKnight


A higher-level view on subject

Your focus and efforts, dedicated to creating a testing framework, signal that your will aims at developing a rigorous and professional-grade approach, which has made me first raise my hat in a salute of admiration to such brave undertaking.

While testing is an important activity for providing a reasonable quantitative evidence, that a System Under Test is meeting defined expectations, the success in this depends on how close the testing environment meets the real-deployment's conditions.

One may agree, that testing on another, different, bases does not prove the real-deployment will run as expected in an environment, that is principally different from the tested one(s).


Element-wise control or just a state-wise control, that's the question.

Your efforts ( at least at the time of OP was posted ) concentrate on code-architecture, that tries to keep instances in-place and tries to re-set an internal state of a Poller instance before a next test-battery starts.

In my view, testing has a few principles to follow, should you strive for professional testing:

  • Principle of a Test Repeatability ( tests' re-runs shall serve same results, thus avoiding a quasi-testing that provides just a result-"lottery" )

  • Principle of Non-Intervening Testing ( tests' re-runs shall not be subject of "external"-interference, not controlled by the test scenario )

Having said this, let me bring a few notes inspired by Harry Markowitz, Nobelist awarded for his remarkable quantitative portfolio optimisation studies.

Rather move one step back to get control over elements' full life-cycle

CACI Simulations, Inc., ( one of Harry Markowitz's companies ) developed in the early 90s their flagship software framework COMET III - an exceptionally powerful simulation engine for large, complex design-prototyping and performance-simulations of processes operated in large-scale computing / networking / telco networks.

The greatest impression from COMET III was it's capability to generate testing scenarios including a configurable pre-test "warm-up" pre-load(s), that have made the tested elements get into a state similar to what "fatigue" means in mechanical torture-test experiments or what hydrogen-diffusion fragility means to nuclear power-plant metallurgists.

Yes, once you go into low-level details on how algorithms, node-buffers, memory-allocations, pipe-lined / load-balanced / grid-processing architecture selections, fault-resilience overheads, garbage collection policies and limited resource-sharing algorithms work and impact ( under real-use work-load patterns "pressure" ) end-to-end performance / latencies, this feature is simply indispensable.

This means, that an individual instance-related simple state-wise control is not sufficient, as it does not provide means for either the test-repeatability and test-isolation/non-intervening behaviour. Simply put, even if you find a way to "reset" a Poller instance, this will not get you into realistic testing with guaranteed test-repeatability with pre-test warm-up(s) possible.

A step-back and a higher layer of abstraction and test-scenario controls is needed.

How does this apply to the OP problem?

  • Instead of just state-control
  • Create a multi-layer-ed architecture / control-plane(s) / separate signalling

A ZeroMQ way of supporting this goal

  • Create super-structures as non-trivial patterns
  • Use full life-cycle controls of instances used inside testing scenarios
  • Keep ZeroMQ-maxims: Zero-sharing, Zero-blocking, ...
  • Benefit from Multi-Context()
like image 41
user3666197 Avatar answered Oct 23 '22 08:10

user3666197