Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient C++11 design for event listener with a regular wake-up?

The Problem

I've a system (using C++11) with a producer of irregular events: P (e.g. it could be UI events, or it could be receiving financial trade data over a TCP/IP socket, etc.) Each event comes with a small packet of data.

Then I have a number of "worker bees": B1, B2, B3, ... Each of these does their own processing of the events that P feeds them. Their processing might be quick, but might take a long time, which is why the plan is to run each worker bee in its own thread. In addition each of the worker bees need to run a different function every N seconds (e.g. N=60, but again it can vary by worker bee). This regular processing should always be done in serial with the event processing, never on a different thread.

Finally, some of the worker bees might also get events from other producers (P2, P3, etc.) However if that complicates things I can always have P1, P2, etc feed to a central P whose job is to send all events to the worker bees.

The Question

What is the best design for such a system? Low latency and efficiency are the main criteria for best. Reliability is also important though: every B must receive every event (even if they come through as a batch because it was busy at the time), and if one B crashes it should not affect the others.

If it matters: assume 1-64 worker bees, 4-8 hardware threads, average time between events is 10 seconds, minimum time between events is 0.2 seconds, and the typical regular function is every N=60 seconds. But if the ideal design is sensitive to any of those criteria I'd like to understand how.

NOTE: If the worker bees can guarantee never to throw an exception, does that change the choice of best design? (It feels like that will be unrelated, but I thought I'd bring it up.)

NOTE: there may be more bees than hardware threads; assume that is a problem for another time. (E.g. latency may matter for some worker bees and they may be given their own threads, whereas others may be told to share a thread.)

Idea One: wait until event or timeout

Each P has a mutex and a condition. When it gets new data it signals the condition.

Each worker bee uses theCondition.wait_until(lock,timeout) where timeout is the time it next needs to wake up to do its regular processing. It checks the return value to see if it was signalled, or timed-out.

The downside here seems to be that it is just a signal, no data. So I'd need each B to get another lock for read access to the data queue. And typically they'll all want to do that at the same time, so this gets ugly.

I'm also not clear what happens if one B takes a long time processing something and misses a couple of events before it gets around to calling wait_until again.

Idea Two: per-worker data queue

Here each B has a queue, with a lock. P gets a write lock, and adds a data item. B gets a read lock to pop each item off when it is ready. I still need some way for B to know to wake-up because there is new data.

The downside here seems to be the P thread needs to loop through each B to give them data. That introduces latency, and also feels fragile (e.g. if one of the worker bees is behaving badly).

Idea Three: futures

This problem feels like a good fit for futures. P creates a std::promise, then each B gets a std::future (a std::shared_future, I assume). As P receives a new event it calls set_value() on the promise. Each B is calling wait_until on its future.

This appeals as the signal and the data come together. Also there is no locking, so it should be resilient.

The bit I'm stuck with is that the promise/future is a one-bullet gun. I need to create a new set of promise/shared_future pairs immediately after each new event. How could that even work? (Could I pass the next shared_future as part of the data being sent by the set_value call?) Is there any chance of an event being missed by any of the workers if two events come through in quick succession?

like image 754
Darren Cook Avatar asked Aug 05 '14 13:08

Darren Cook


People also ask

What method do we use to create an event listener?

The addEventListener() method allows you to add event listeners on any HTML DOM object such as HTML elements, the HTML document, the window object, or other objects that support events, like the xmlHttpRequest object.

How do Eventlisteners work?

An event listener is a function that initiates a predefined process if a specific event occurs. So, an event listener “listens” for an action, then calls a function that performs a related task. This event can take one of many forms. Common examples include mouse events, keyboard events, and window events.

What is HTML event delegation?

Event delegation refers to the process of using event propagation (bubbling) to handle events at a higher level in the DOM than the element on which the event originated. It allows us to attach a single event listener for elements that exist now or in the future.


1 Answers

Sounds like you can benefit form the producer-consumer pattern. Here is an example of this using boost libraries and a lockfree queue (from boost), just change the type that it is operating on:

boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);

boost::lockfree::queue<int> queue(128);

const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;

void producer(void)
{
    for (int i = 0; i != iterations; ++i) {
       int value = ++producer_count;
        while (!queue.push(value))
            ;
    }
}

boost::atomic<bool> done (false);
void consumer(void)
{
    int value;
    while (!done) {
        while (queue.pop(value))
            ++consumer_count;
    }

    while (queue.pop(value))
        ++consumer_count;
}

int main(int argc, char* argv[])
{
    using namespace std;
    cout << "boost::lockfree::queue is ";
    if (!queue.is_lock_free())
        cout << "not ";
    cout << "lockfree" << endl;

    boost::thread_group producer_threads, consumer_threads;

    for (int i = 0; i != producer_thread_count; ++i)
        producer_threads.create_thread(producer);

    for (int i = 0; i != consumer_thread_count; ++i)
        consumer_threads.create_thread(consumer);

    producer_threads.join_all();
    done = true;

    consumer_threads.join_all();

    cout << "produced " << producer_count << " objects." << endl;
    cout << "consumed " << consumer_count << " objects." << endl;
}
like image 200
wbennett Avatar answered Nov 07 '22 13:11

wbennett