My server side sends me batches of messages. The number of messages in a batch and frequency is arbitrary. At times, I get messages at 1 minute intervals and sometimes no messages for an hour. Anywhere from 1 to 10 messages.
My current implementation uses Observable.Buffer(TimeSpan.FromSeconds(5))
to group and send the messages to subscriber.
Instead of having to check every 5 seconds, is there a way to configure the Observable to say send your buffered messages to subscriber if there's an x seconds delay between two messages.
How to avoid an unnecessary timer ticking every 5 seconds? (I'm open to other suggestions to optimize the batch processing.)
decPL suggested using the overload of Buffer
that accepts a bufferClosingSelector
- a factory function that is called at the opening of a new buffer. It produces a stream whose first OnNext()
or OnCompleted()
signals flushing the current buffer. decPLs code looked like this:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
This makes considerable progress towards a solution, but it has a couple of problems:
bufferClosingSelector
factory is called after each buffer closing, so if the source is cold it would be throttling from the initial events, rather than the most recent.We need to use an additional mechanism to limit the buffer length and prevent indefinite throttling. Buffer
has an overload that allows you to specify a maximum length, but unfortunately you can't combine this with a closing selector.
Let's call the desired buffer length limit n. Recall the first OnNext
of the closing selector is enough to close the buffer, so all we need to do is Merge
the throttle with a counting stream that sends OnNext
after n events from the source. We can use .Take(n).LastAsync()
to do this; take the first n events but ignore all but the last of this. This is a very useful pattern in Rx.
In order to address the issue of the bufferClosingSelector
factory resubscribing to the source, we need to use the common pattern of .Publish().RefCount()
on the source to give us a stream that will only send the most recent events to subscribers. This is also a very useful pattern to remember.
Here is the reworked code, where the throttle duration is merged with a counter:
var throttleDuration = TimeSpan.FromSeconds(5);
var bufferSize = 3;
// single subscription to source
var sourcePub = source.Publish().RefCount();
var output = sourcePub.Buffer(
() => sourcePub.Throttle(throttleDuration)
.Merge(sourcePub.Take(bufferSize).LastAsync()));
Here is a production ready implementation with tests (use nuget packages rx-testing & nunit). Note the parameterization of the scheduler to support testing.
public static partial class ObservableExtensions
{
public static IObservable<IList<TSource>> BufferNearEvents<TSource>(
this IObservable<TSource> source,
TimeSpan maxInterval,
int maxBufferSize,
IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
if (maxBufferSize <= 0)
throw new ArgumentOutOfRangeException(
"maxBufferSize", "maxBufferSize must be positive");
var publishedSource = source.Publish().RefCount();
return publishedSource.Buffer(
() => publishedSource
.Throttle(maxInterval, scheduler)
.Merge(publishedSource.Take(maxBufferSize).LastAsync()));
}
}
public class BufferNearEventsTests : ReactiveTest
{
[Test]
public void CloseEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int> expectedBuffer = new [] {1, 2, 3};
var expectedTime = maxInterval.Ticks + 300;
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(1000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer)));
}
[Test]
public void FarEventsAreUnbuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 1000;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(1000, 1),
OnNext(2000, 2),
OnNext(3000, 3));
IList<int>[] expectedBuffers =
{
new[] {1},
new[] {2},
new[] {3}
};
var expectedTimes = new[]
{
maxInterval.Ticks + 1000,
maxInterval.Ticks + 2000,
maxInterval.Ticks + 3000
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)),
OnNext<IList<int>>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer)));
}
[Test]
public void UpToMaxEventsAreBuffered()
{
TimeSpan maxInterval = TimeSpan.FromTicks(200);
const int maxBufferSize = 2;
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
IList<int>[] expectedBuffers =
{
new[] {1,2},
new[] {3}
};
var expectedTimes = new[]
{
200, /* Buffer cap reached */
maxInterval.Ticks + 300
};
var results = scheduler.CreateObserver<IList<int>>();
source.BufferNearEvents(maxInterval, maxBufferSize, scheduler)
.Subscribe(results);
scheduler.AdvanceTo(10000);
results.Messages.AssertEqual(
OnNext<IList<int>>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)),
OnNext<IList<int>>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)));
}
private static bool CheckBuffer<T>(IEnumerable<T> expected, IEnumerable<T> actual)
{
CollectionAssert.AreEquivalent(expected, actual);
return true;
}
}
If I understood your description correctly, Observable.Buffer
is still your friend, just using the overload that causes an observable event to dictate when buffered items should be sent. Something as follows:
observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With