I have a bunch of threads that generate events of type A
and type B
.
My program takes these events, wraps them in a message and sends them across the network. A message can hold either one A
event, one B
event, or one A
event and one B
event:
SendMessage(new Message(a: 1, b: null));
SendMessage(new Message(a: null, b: 2 ));
SendMessage(new Message(a: 3, b: 4 ));
Events of type A
happen quite frequently, while events of type B
occur much less often. So, when a thread generates a B
event, my program waits a bit to see if another thread generates an A
event and combines the A
event and the B
event if possible.
Here is my code:
object gate = new object();
int? pendingB;
Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, millisecondsTimeout);
if (pendingB != b) return null;
pendingB = null;
}
}
return new Message(null, b);
}
This works so far. However, there are two problems:
If there are lots of A
events and lots of B
events, the algorithm is not very efficient: Only a certain percentage of B
events is attached to A
events, even when there are enough A
events.
If there are no A
events generated for a while (uncommon, but not impossible), the algorithm is completely unfair: One thread generating B
events has to wait every time, while all other threads can send their B
events right away.
How can I improve efficiency and fairness of the algorithm?
Constraints:
• WrapA
and WrapB
must terminate within a short, deterministic amount of time.
• SendMessage
must be called outside any locks.
• There is no synchronization mechanism available other than gate
.
• There are not additional threads, tasks, timers, etc. available.
• Since events of type A
happen so frequently in the normal case, busy-waiting in WrapB
is fine.
Here is a test program that can be used as a benchmark:
public static class Program
{
static int counter0 = 0;
static int counterA = 0;
static int counterB = 0;
static int counterAB = 0;
static void SendMessage(Message m)
{
if (m != null)
if (m.a != null)
if (m.b != null)
Interlocked.Increment(ref counterAB);
else
Interlocked.Increment(ref counterA);
else
if (m.b != null)
Interlocked.Increment(ref counterB);
else
Interlocked.Increment(ref counter0);
}
static Thread[] Start(int threadCount, int eventCount,
int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
{
Thread[] threads = new Thread[threadCount * eventCount];
for (int i = 0; i < threadCount; i++)
{
for (int j = 0; j < eventCount; j++)
{
int k = i * 1000 + j;
int l = j * eventInterval + i;
threads[i * eventCount + j] = new Thread(() =>
{
Thread.Sleep(l);
SendMessage(wrap(k, wrapTimeout));
});
threads[i * eventCount + j].Start();
}
}
return threads;
}
static void Join(params Thread[] threads)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
// Only A events
var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
Join(t0);
// A and B events
var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
Join(t1);
Join(t2);
// Only B events
var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", counter0);
Console.WriteLine("A: {0}", counterA);
Console.WriteLine("B: {0}", counterB);
Console.WriteLine("AB: {0}", counterAB);
Console.WriteLine("Generated A: {0}, Sent A: {1}",
10 * 40 + 10 * 40, counterA + counterAB);
Console.WriteLine("Generated B: {0}, Sent B: {1}",
10 * 10 + 10 * 20, counterB + counterAB);
}
}
For the fun of it, here is a lock-free implementation:
public sealed class MessageWrapper
{
private int pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
int b = Interlocked.Exchange(ref pendingB, -1);
return new Message(a, b == -1 ? null : b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
var sw = new SpinWait();
while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
{
// Spin
sw.SpinOnce();
if (sw.NextSpinWillYield)
{
// Let us make progress instead of yielding the processor
// (avoid context switch)
return new Message(null, b);
}
}
return null;
}
}
Results
Original implementation:
00:00:02.0433298
0: 0
A: 733
B: 233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Lock-free implementation:
00:00:01.2546310
0: 0
A: 717
B: 217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Update
Unfortunately, above implementation has a bug plus some shortcoming. Here is an improved version:
public class MessageWrapper
{
private int pendingB = EMPTY;
private const int EMPTY = -1;
public Message WrapA(int a, int millisecondsTimeout)
{
int? b;
int count = 0;
while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
{
if (count % 7 == 0)
{
Thread.Sleep(0);
}
else if (count % 23 == 0)
{
Thread.Sleep(1);
}
else
{
Thread.Yield();
}
if (++count == 480)
{
return new Message(a, null);
}
}
return new Message(a, b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
int count = 0;
while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
{
// Spin
Thread.SpinWait((4 << count++));
if (count > 10)
{
// We didn't manage to place our payload.
// Let's send it ourselves:
return new Message(null, b);
}
}
// We placed our payload.
// Wait some more to see if some WrapA snatches it.
while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
{
Thread.SpinWait((4 << count++));
if (count > 20)
{
// No WrapA came along. Pity, we will have to send it ourselves
int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
return payload == b ? new Message(null, b) : null;
}
}
return null;
}
}
Results:
OP's implementation
00:00:02.1389474
0: 0
A: 722
B: 222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Second implementation:
00:00:01.2752425
0: 0
A: 700
B: 200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
For diversity, I tried an approach based on the concurrent collections. To me it's not clear from the posted constraints whether that is okay but I'll shoot my answer anyway:
This is the typical output from your original code on my machine:
00:00:01.7835426
0: 0
A: 723
B: 223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
This is the typical output from my suggestion, about 20% slower than the original code but it captures more 'AB' messages:
00:00:02.1322512
0: 0
A: 701
B: 201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
MessageWrapper implementation:
public class MessageWrapper
{
private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
private BlockingCollection<int?> messageB = new BlockingCollection<int?>();
public Message WrapA(int a, int millisecondsTimeout)
{
messageA.Add(a);
return CreateMessage(0);
}
public Message WrapB(int b, int millisecondsTimeout)
{
messageB.Add(b);
return CreateMessage(millisecondsTimeout);
}
private Message CreateMessage(int timeout)
{
int? a, b;
if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
{
return new Message(a, b);
}
else
{
return null;
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With