Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Improve efficiency and fairness when combining temporally close events

Tags:

c#

concurrency

I have a bunch of threads that generate events of type A and type B.

My program takes these events, wraps them in a message and sends them across the network. A message can hold either one A event, one B event, or one A event and one B event:

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));

Events of type A happen quite frequently, while events of type B occur much less often. So, when a thread generates a B event, my program waits a bit to see if another thread generates an A event and combines the A event and the B event if possible.

Here is my code:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}

This works so far. However, there are two problems:

  • If there are lots of A events and lots of B events, the algorithm is not very efficient: Only a certain percentage of B events is attached to A events, even when there are enough A events.

  • If there are no A events generated for a while (uncommon, but not impossible), the algorithm is completely unfair: One thread generating B events has to wait every time, while all other threads can send their B events right away.

How can I improve efficiency and fairness of the algorithm?

Constraints:
•  WrapA and WrapB must terminate within a short, deterministic amount of time.
•  SendMessage must be called outside any locks.
•  There is no synchronization mechanism available other than gate.
•  There are not additional threads, tasks, timers, etc. available.
•  Since events of type A happen so frequently in the normal case, busy-waiting in WrapB is fine.


Here is a test program that can be used as a benchmark:

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}
like image 730
dtb Avatar asked Sep 10 '25 21:09

dtb


2 Answers

For the fun of it, here is a lock-free implementation:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}

Results

Original implementation:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Lock-free implementation:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Update

Unfortunately, above implementation has a bug plus some shortcoming. Here is an improved version:

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let's send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}

Results:

OP's implementation

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

Second implementation:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
like image 103
afrischke Avatar answered Sep 12 '25 11:09

afrischke


For diversity, I tried an approach based on the concurrent collections. To me it's not clear from the posted constraints whether that is okay but I'll shoot my answer anyway:

This is the typical output from your original code on my machine:

00:00:01.7835426
0:  0
A:  723
B:  223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

This is the typical output from my suggestion, about 20% slower than the original code but it captures more 'AB' messages:

00:00:02.1322512
0:  0
A:  701
B:  201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300

MessageWrapper implementation:

public class MessageWrapper
{
    private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
    private BlockingCollection<int?> messageB = new BlockingCollection<int?>();

    public Message WrapA(int a, int millisecondsTimeout)
    {
        messageA.Add(a);
        return CreateMessage(0);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        messageB.Add(b);
        return CreateMessage(millisecondsTimeout);
    }

    private Message CreateMessage(int timeout)
    {
        int? a, b;

        if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
        {
            return new Message(a, b);
        }
        else
        {
            return null;
        }
    }
}
like image 38
Christoffer Avatar answered Sep 12 '25 11:09

Christoffer