I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken
, which effectively stops the subscription.
When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250));
observable
.Subscribe(
value =>
{
Console.WriteLine(value);
Thread.Sleep(500); // Simulate processing
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation detected on {0}.", value);
Thread.Sleep(500); // Simulate some time consuming shutdown
Console.WriteLine("Cleaning up done for {0}.", value);
}
},
() => Console.WriteLine("Completed"),
cts.Token);
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");
0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.
As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.
0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.
The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.
(Edit: Added expected output)
If I understand the question correctly, this isn't an Rx problem, this is a 'Whatever you are you doing in the Subscribe' problem. Your subscribe action takes half a second, with the possibility of a cleanup taking another half a second, and your job termination takes micro-seconds. What is it that you're hoping to squeeze in between cancellation and termination?
The best advice I can give you is to have the subscribe action honor the cancellation token better than the Thread.Sleep
calls do.
Using the answer to a similar question together with the answer to a question about waiting before terminating, I figured out a solution that does what I want.
My original problem was that I found no way to wait for the subscription's thread. The answers linked above lead me to refactoring the code in three ways:
I moved the cancellation-logic away from the subscription into the observable.
The subscription is wrapped in its own Task
(so the execution can continue to the ReadLine
-statement).
A ManualResetEvent
was introduced to control the application exit strategy.
Solution:
var reset = new ManualResetEvent(false);
var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(250))
.TakeWhile(x => !cts.Token.IsCancellationRequested)
.Finally(
() =>
{
Console.WriteLine("Finally: Beginning finalization.");
Thread.Sleep(500);
Console.WriteLine("Finally: Done with finalization.");
reset.Set();
});
await Task.Factory.StartNew(
() => observable
.Subscribe(
value =>
{
Console.WriteLine("Begin: {0}", value);
Thread.Sleep(2000);
Console.WriteLine("End: {0}", value);
},
() => Console.WriteLine("Completed: Subscription completed.")),
TaskCreationOptions.LongRunning);
Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");
Output:
Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.
Being quite new to Reactive Extensions, I don't know if this is the best solution to my problem. But it is a great improvement to the example posted in the question, as it fulfills my requirements:
ManualResetEvent
). TakeWhile
-method.Finally
-method.This is a much nicer solution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With