Using Rx, I desire pause and resume functionality in the following code:
static IDisposable _subscription;
static void Main(string[] args)
{
Subscribe();
Thread.Sleep(500);
// Second value should not be shown after two seconds:
Pause();
Thread.Sleep(5000);
// Continue and show second value and beyond now:
Resume();
}
static void Subscribe()
{
var list = new List<int> { 1, 2, 3, 4, 5 };
var obs = list.ToObservable();
_subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Thread.Sleep(2000);
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
}
static void Pause()
{
// Pseudocode:
//_subscription.Pause();
}
static void Resume()
{
// Pseudocode:
//_subscription.Resume();
}
I believe I could make it work with some kind of Boolean field gating combined with thread locking (Monitor.Wait
and Monitor.Pulse
)
But is there an Rx operator or some other reactive shorthand to achieve the same aim?
Here's a reasonably simple Rx way to do what you want. I've created an extension method called Pausable
that takes a source observable and a second observable of boolean that pauses or resumes the observable.
public static IObservable<T> Pausable<T>(
this IObservable<T> source,
IObservable<bool> pauser)
{
return Observable.Create<T>(o =>
{
var paused = new SerialDisposable();
var subscription = Observable.Publish(source, ps =>
{
var values = new ReplaySubject<T>();
Func<bool, IObservable<T>> switcher = b =>
{
if (b)
{
values.Dispose();
values = new ReplaySubject<T>();
paused.Disposable = ps.Subscribe(values);
return Observable.Empty<T>();
}
else
{
return values.Concat(ps);
}
};
return pauser.StartWith(false).DistinctUntilChanged()
.Select(p => switcher(p))
.Switch();
}).Subscribe(o);
return new CompositeDisposable(subscription, paused);
});
}
It can be used like this:
var xs = Observable.Generate(
0,
x => x < 100,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(0.1));
var bs = new Subject<bool>();
var pxs = xs.Pausable(bs);
pxs.Subscribe(x => { /* Do stuff */ });
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
It should be fairly easy for you to put this in your code with the Pause
& Resume
methods.
Here it is as an application of IConnectableObservable that I corrected slightly for the newer api (original here):
public static class ObservableHelper {
public static IConnectableObservable<TSource> WhileResumable<TSource>(Func<bool> condition, IObservable<TSource> source) {
var buffer = new Queue<TSource>();
var subscriptionsCount = 0;
var isRunning = System.Reactive.Disposables.Disposable.Create(() => {
lock (buffer)
{
subscriptionsCount--;
}
});
var raw = Observable.Create<TSource>(subscriber => {
lock (buffer)
{
subscriptionsCount++;
if (subscriptionsCount == 1)
{
while (buffer.Count > 0) {
subscriber.OnNext(buffer.Dequeue());
}
Observable.While(() => subscriptionsCount > 0 && condition(), source)
.Subscribe(
v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); },
e => subscriber.OnError(e),
() => { if (subscriptionsCount > 0) subscriber.OnCompleted(); }
);
}
}
return isRunning;
});
return raw.Publish();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With