Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX - rethrow an error in containing method

I need to translate an error in an RX stream (IObservable) into an exception in the method that contains the subscription to the stream

(because of this issue https://github.com/aspnet/SignalR/pull/1331 , Whereby errors arent serialised to clients.) Once this issue is fixed I will revert to handling error properly

e.g.

I have the following method

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return _mySvc.ThingChanged();
}

So I have tried to subscribe to the stream and rethrow the error, but it still doesnt get transmitted to the client:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    _mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
    return _mySvc.ThingChanged();
}

private void OnError(Exception exception)
{
    throw new Exception(exception.Message);
}

What I need is the equivelent of throwing in the LiveStream method

e.g. this error is propogated to the client

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    throw new Exception("some error message");
    return _mySvc.ThingChanged();
}

any ideas how to achieve this?

like image 500
ChrisCa Avatar asked Oct 16 '22 23:10

ChrisCa


1 Answers

I have found this as well, especially with a "contained" reactive pipeline—that is, one with a well-defined beginning and end. In situations like those, it may suffice to simply allow underlying exceptions to bubble up to the containing scope. But as you have found, that concept is rather foreign to Rx generally: what happens in the pipeline stays in the pipeline.

The only way out of this that I have found in a contained scenario is to "slip" the error out of the stream using Catch(), and hand back an empty IObservable to allow the stream to halt naturally (otherwise, you'll hang if you're awaiting an IObservable for completion).

This will not work within your LiveStream() method, because that context/scope should have passed out of existence long before you're consuming your stream. So, this will have to happen in the context that contains the whole pipeline.

Exception error = null;
var source = LiveStream()
  .Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
  ...

await source; // if this is how you're awaiting completion

// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);

Just don't throw error there at the end, you'll lose the original stack trace.

like image 117
Marc L. Avatar answered Oct 21 '22 02:10

Marc L.