Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable determine if subscriber function has finished

What is the best way to determine if the subscriber has finished executing or better yet return something and catch it up-stream? For example:

    this._subscriptions.push(this._client
        .getCommandStream(this._command) // Returns an IObservable from a Subject stream
        .subscribe(msg => {
            // Do some processing maybe some promise stuff
            http.request(url).then(
                // some more stuff
            );
        });

What's the best know to determine that subscription has finished. I've implemented it as follows:

    this._subscriptions.push(this._client
        .getCommandStream(this._command)
        .subscribe(msg => {
            // Do some processing maybe some promise stuff
            http.request(url).then(re => {
                // some more stuff
                msg.done()
            }).catch(err => msg.done(err));
        });

i.e. added a done method to the object being passed in to determine if this is finished. The issue with that is I'll have to call done in every promise or catch block and find that a little too exhaustive. Is there a cleaner and more automated way of doing this?

I think the examples I've given are not good enough. This implementation is using RX to build an internal messaging bus. The get command stream is actually returning a read-only channel (as an Observable) to get commands and process them. Now the processing could be a http request followed by many other things or just an if statement.

this._client
.getCommandStream(this._command) // Returns an IObservable from a Subject stream
  .subscribe(msg => {
      // Do some processing maybe some promise stuff
      http.request(url).then({
          // some more stuff

          }).then({
            // Here I wanna do some file io
            if(x) {
                file.read('path', (content) => {
                    msg.reply(content);
                    msg.done();
                });
            } else {
            // Or maybe not do a file io or maybe even do some image processing
                msg.reply("pong");
                msg.done()
            }
            });
      });

I feel like this is a fine usage of the Observable pattern as this is exactly a sequence of commands coming in and this logic would like to act on them. The question is notice msg.done() being called all over the place. I want to know what is the best way to limit that call and know when the entire thing is done. Another option is to wrap it all in a Promise but then again what's the difference between resolve or msg.done()?

like image 806
Arijoon Avatar asked Apr 16 '26 23:04

Arijoon


1 Answers

Actually, making another asynchronous request inside subscribe() isn't recommended because it just makes things more complicated and using Rx in this way doesn't help you make your code more understandable.

Since you need to make a request to a remote service that returns a PRomise you can merge it into the chain:

this._subscriptions.push(this._client
    .getCommandStream(this._command)
    .concatMap(msg  => http.request(url))
    .subscribe(...)

Also the 3rd parameter to subscribe is a callback that is called when the source Observable completes.

You can also add your own teardown logic when the chain is being disposed. This is called after the complete callback in subscribe(...) is called:

const subscription = this._subscriptions.push(this._client
    ...
    .subscribe(...)

subscription.add(() => doWhatever())

Btw, this is equivalent to using the finally() operator.

like image 52
martin Avatar answered Apr 19 '26 12:04

martin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!