Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lazy observable sequence that replays value or error

I am trying to create an observable pipeline with the following characteristics:

  • is lazy (does nothing until somebody subscribes)
  • executes at most once regardless of how many subscriptions are received
  • replays its resulting value, if any OR
  • replays its resulting error, if any

For the life of me, I can't figure out the correct semantics to accomplish this. I thought it would be a simple case of doing something like this:

Observable
    .Defer(() => Observable
        .Start(() => { /* do something */ })
        .PublishLast()
        .ConnectUntilCompleted());

Where ConnectUntilCompleted just does what it sounds like:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    @this.Connect();
    return @this;
}

This seems to work when the observable terminates successfully, but not when there's an error. Any subscribers do not receive the error:

[Fact]
public void test()
{
    var o = Observable
        .Defer(() => Observable
            .Start(() => { throw new InvalidOperationException(); })
            .PublishLast()
            .ConnectUntilCompleted());

    // this does not throw!
    o.Subscribe();
}

Can anyone tell me what I'm doing wrong? Why doesn't Publish replay any error it receives?

UPDATE: it gets even stranger:

[Fact]
public void test()
{
    var o = Observable
        .Defer(() => Observable
            .Start(() => { throw new InvalidOperationException(); })
            .PublishLast()
            .ConnectUntilCompleted())
        .Do(
            _ => { },
            ex => { /* this executes */ });

    // this does not throw!
    o.Subscribe();

    o.Subscribe(
        _ => { },
        ex => { /* even though this executes */ });
}
like image 258
Kent Boogaart Avatar asked Feb 16 '26 07:02

Kent Boogaart


1 Answers

Try this version of you ConnectUntilCompleted method:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var subscription = @this.Subscribe(o);
        var connection = @this.Connect();
        return new CompositeDisposable(subscription, connection);
    });
}

The allows Rx to behave properly.

Now I've added to it to help show what's going on:

public static IObservable<T> ConnectUntilCompleted<T>(this IConnectableObservable<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var disposed = Disposable.Create(() => Console.WriteLine("Disposed!"));
        var subscription = Observable
            .Defer<T>(() => { Console.WriteLine("Subscribing!"); return @this; })
            .Subscribe(o);
        Console.WriteLine("Connecting!");
        var connection = @this.Connect();
        return new CompositeDisposable(disposed, subscription, connection);
    });
}

Now your observable looks like this:

var o =
    Observable
        .Defer(() =>
            Observable
                .Start(() =>
                {
                    Console.WriteLine("Started.");
                    throw new InvalidOperationException();
                }))
        .PublishLast()
        .ConnectUntilCompleted();

The final key thing is to actually handle the errors in the subscription - so it's not enough to simply do o.Subscribe().

So do this:

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));

        o.Subscribe(
            x => Console.WriteLine(x),
            e => Console.WriteLine(e.Message),
            () =>  Console.WriteLine("Done."));         

When I run that I get this:

Subscribing!
Connecting!
Subscribing!
Connecting!
Subscribing!
Connecting!
Started.
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!
Operation is not valid due to the current state of the object.
Disposed!

Note that "Started" only appears once, but the error is reported three times.

(Sometimes Started appears higher up in the list after the first subscription.)

I think this is what you wanted from your description.

like image 127
Enigmativity Avatar answered Feb 18 '26 20:02

Enigmativity



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!