Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How is an observable subscription gracefully terminated?

I'm attempting to use Reactive Extensions (Rx) to process a stream of data. The processing of each element may take some time, though. To break the processing, I'm using a CancellationToken, which effectively stops the subscription.

When cancel has been requested, how do I gracefully finish the current work and terminate properly without losing any data?

Example

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250));

observable
    .Subscribe(
        value =>
            {
                Console.WriteLine(value);
                Thread.Sleep(500); // Simulate processing
                
                if (cts.Token.IsCancellationRequested)
                {
                    Console.WriteLine("Cancellation detected on {0}.", value);
                    Thread.Sleep(500); // Simulate some time consuming shutdown
                    Console.WriteLine("Cleaning up done for {0}.", value);
                }
            },
        () => Console.WriteLine("Completed"),
        cts.Token);
        
Console.ReadLine();
cts.Cancel();
Console.WriteLine("Job terminated.");

Output

0
1
2
Token cancelled.
Job terminated.
Cancellation detected on 2.
Cleaning up done for 2.

As can be seen from the output, the line "Job terminated" is not the last line, which means that the cleanup would not have had enough time to finish up before the application has terminated.

Expected Output

0
1
2
Token cancelled.
Cancellation detected on 2.
Cleaning up done for 2.
Job terminated.

The line "Job terminated" is the very last line to be printed. The "Cancellation" and "Cleaning" lines have been allowed to take their time.

(Edit: Added expected output)

like image 325
Reyhn Avatar asked Jul 19 '17 14:07

Reyhn


2 Answers

If I understand the question correctly, this isn't an Rx problem, this is a 'Whatever you are you doing in the Subscribe' problem. Your subscribe action takes half a second, with the possibility of a cleanup taking another half a second, and your job termination takes micro-seconds. What is it that you're hoping to squeeze in between cancellation and termination?

The best advice I can give you is to have the subscribe action honor the cancellation token better than the Thread.Sleep calls do.

like image 195
Shlomo Avatar answered Nov 04 '22 08:11

Shlomo


Using the answer to a similar question together with the answer to a question about waiting before terminating, I figured out a solution that does what I want.

My original problem was that I found no way to wait for the subscription's thread. The answers linked above lead me to refactoring the code in three ways:

  1. I moved the cancellation-logic away from the subscription into the observable.

  2. The subscription is wrapped in its own Task (so the execution can continue to the ReadLine-statement).

  3. A ManualResetEvent was introduced to control the application exit strategy.

Solution:

var reset = new ManualResetEvent(false);

var cts = new CancellationTokenSource();
cts.Token.Register(() => Console.WriteLine("Token cancelled."));

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(250))
    .TakeWhile(x => !cts.Token.IsCancellationRequested)
    .Finally(
        () =>
            {
                Console.WriteLine("Finally: Beginning finalization.");
                Thread.Sleep(500);
                Console.WriteLine("Finally: Done with finalization.");
                reset.Set();
            });

await Task.Factory.StartNew(
    () => observable
        .Subscribe(
            value =>
                {
                    Console.WriteLine("Begin: {0}", value);
                    Thread.Sleep(2000);
                    Console.WriteLine("End: {0}", value);
                },
            () => Console.WriteLine("Completed: Subscription completed.")),
    TaskCreationOptions.LongRunning);

Console.ReadLine();
cts.Cancel();
reset.WaitOne();
Console.WriteLine("Job terminated.");

Output:

Begin: 0
End: 0
Begin: 1
Token cancelled.
End: 1
Completed: Subscription completed.
Finally: Beginning finalization.
Finally: Done with finalization.
Job terminated.

Being quite new to Reactive Extensions, I don't know if this is the best solution to my problem. But it is a great improvement to the example posted in the question, as it fulfills my requirements:

  • Each OnNext-action is allowed to run to completion.
  • The application waits for the stream-processing to complete (signalled by the ManualResetEvent).
  • The stream-cancellation logic is moved to the producer (instead of the consumer) in the TakeWhile-method.
  • The application termination logic is a reaction to the stream-cancellation in the producer's Finally-method.

This is a much nicer solution.

like image 2
Reyhn Avatar answered Nov 04 '22 08:11

Reyhn