How can I clear the buffer on a ReplaySubject
?
Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject
continually growing and eventually eating all the memory.
Ideally I want to keep the same ReplaySubject
as the client subscriptions are still good.
ReplaySubject is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from.
ReplaySubject replays old values to new subscribers when they first subscribe. The ReplaySubject will store every value it emits in a buffer. It will emit them to the new subscribers in the order it received them. You can configure the buffer using the arguments bufferSize and windowTime.
ReplaySubject
doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:
TimeSpan
that items are retained forThis was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject
you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.
I've run this through a memory profiler to check it does the right thing. Call Clear()
to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
Respect usual rules (as with any Subject
) and don't call methods on this class concurrently - including Clear()
. You could add synchronization locks trivially if needed.
It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects
) holds a buffer of exactly one inner ReplaySubject (_currentSubject
), and it is populated on construction.
The OnXXX
methods call through to the _currentSubject
ReplaySubject.
Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects
). Because the buffer size of _subjects
is just 1, new subscribers acquire the events of only the most recent ReplaySubject
onwards.
Whenever we need to "clear the buffer", the existing _currentSubject
is OnCompleted
and a new ReplaySubject is added to _subjects
and becomes the new _currentSubject
.
Following @Brandon's suggestion, I created a version of RollingReplaySubject
that uses either a TimeSpan
or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With