Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create an observable on a method

In my app, I have a method that gets invoked every time an update happens on some queue on a server. The app is initialized to behave this way.

Now, each time the method is invoked with the latest data, I want to treat it like a part of a stream of event and hence make this a part of an Observable that never ends with subscriber.

The challenge I am facing is: how do I create an observable on a method that is being invoked? Below is my sample code.

//This method is invoked every time an update happens on the server
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    Observable.Create<MyObject3>(observer =>
        {
            var object3 = new MyObject3(object1, object2);
            observer.OnNext(object3 );

            return Disposable.Empty;
        })
        .Subscribe(x => WriteLine($"Message acknowledged"));
}

But this creates an observable for each time the method is invoked and not something I want and neither it looks like the right approach. I also read that using "Subject" or "AsyncSubject" is not the right way to solve the issues.

like image 244
Gautam T Goudar Avatar asked Oct 17 '22 17:10

Gautam T Goudar


1 Answers

The rule on not using Subject is more a guideline that hasn't been very well expressed.

In general, if you're using subjects within an observable pipeline then you're likely to be doing something wrong - and that should be avoided.

If you're using a Subject as a source for an observable and you properly encapsulate the Subject and you obfuscate it, then you're fine. So this usually means using a private field that only your code can access (so no-one can call .OnCompleted() on it and calling .AsObservable() so that no-one can cast your observable back to the underlying Subject.

In your case you're directly subscribing so no .AsObservable() is needed, but I suspect that that's just demo code. In the real-world make sure you obfuscate.

Here's what your code should look like:

private Subject<MyObject3> _subject = new Subject<MyObject3>();

private void SetUpObservable()
{
    _subject = new Subject<MyObject3>();
    _subject.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _subject.OnNext(new MyObject3(object1, object2));
}

Now, if you still want to avoid the Subject then you can do this as an alternative:

private Action<MyObject3> _delegate;

private void SetUpObservable()
{
    Observable
        .FromEvent<MyObject3>(h => _delegate += h, h => _delegate -= h)
        .Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _delegate?.Invoke(new MyObject3(object1, object2));
}

In my opinion the Subject gives you better control and it is easier to set up.

In any case, you probably should retain the subscription IDisposable so that you can properly clean up.

like image 147
Enigmativity Avatar answered Nov 16 '22 08:11

Enigmativity