Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs flatMapLatest/switchMap cancel callback. Where is onCancel()?

I've got 2 nested Observable Streams which do HTTP requests. Now I'd like to display a loading indicator, but can't get it working correctly.

var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .do(function(pageNumber) {
            pendingRequests++;
        })
        .concatMap(function(pageNumber) {
            return MyHTTPService.getPage(pageNumber);
        })
        .do(function(response) {
            pendingRequests--;
        });

Rx.createObservableFunction(_self, 'search')
        .flatMapLatest(function(e) {
            return pageStream;
        })
        .subscribe();



search();
nextPage(2);
nextPage(3);
search();

This will trigger pendingRequests++ 4 times, but pendingRequests-- only once, because flatMapLatest will cancel the inner observable before the first 3 HTTP responses arrive.

I couldn't find anything like an onCancel callback. I also tried onCompleted and onError, but those too won't get triggered by flatMapLatest.

Is there any other way to get this working?

Thank you!

EDIT: Desired loading indicator behavior

  1. Example: Single search() call.

    • search() -> start loading indicator
    • when search() response comes back -> disable loading indicator
  2. Example: search() and nextPage() call. (nextPage() is called before search() response came back.)

    • search() -> start loading indicator
    • nextPage() -> indicator is already started, though nothing to do here
    • stop loading indicator after both responses arrived
  3. Example: search(), search(). (search() calls override each other, though the response of the first one can be dismissed)

    • search() -> start loading indicator
    • search() -> indicator is already started, though nothing to do here
    • stop loading indicator when the response for the second search() arrived
  4. Example: search(), nextPage(), search(). (Again: Because of the second search(), the responses from the previous search() and nextPage() can be ignored)

    • search() -> start loading indicator
    • nextPage() -> indicator is already started, though nothing to do here
    • search() -> indicator is already started, though nothing to do here
    • stop loading indicator when response for the second search() arrived
  5. Example: search(), nextPage(). But this time nextPage() is called after search() response came back.

    • search() -> start loading indicator
    • stop loading indicator because search() response arrived
    • nextPage() -> start loading indicator
    • stop loading indicator because nextPage() response arrived

I tried using pendingRequests counter, because I can have multiple relevant requests at the same time (for example: search(), nextPage(), nextPage()). Then of course I'd like to disable the loading indicator after all those relevant requests finished.

When calling search(), search(), the first search() is irrelevant. Same applies for search(), nextPage(), search(). In both cases there's only one active relevant request (the last search()).

like image 505
Benjamin M Avatar asked Aug 30 '16 20:08

Benjamin M


3 Answers

With switchMap aka flatMapLatest you want to trim asap execution of the current inner-stream as new outer-items arrive. It is surely a good design decision as otherwise it would bring a lot of confusion and allow some spooky actions. If you really wanted do do something onCancel you can always create your own observable with custom unsubscribe callback. But still I would recommend not to couple unsubscribe with changing state of the external context. Ideally the unsubscribe would only clean up internally used resources.

Nevertheless your particular case can be solved without accessing onCancel or similar. The key observation is - if I understood your use case correctly - that on search all previous / pending actions may be ignored. So instead of worry about decrementing the counter we can simply start counting from 1.

Some remarks about the snippet:

  • used BehaviorSubject for counting pending requests - as it is ready to be composed with other streams;
  • checked all cases you posted in your question and they work;
  • added some fuzzy tests to demonstrate correctness;
  • not sure if you wanted to allow nextPage when a search is still pending - but it seems to be just a matter of using concatMapTo vs merge;
  • used only standard Rx operators.

PLNKR

console.clear();

const searchSub = new Rx.Subject(); // trigger search 
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests

const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);

const fakeSearch = (x) => Rx.Observable.of(x)
  .do(() => console.log(`SEARCH-START: ${x}`))
  .flatMap(() => 
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`SEARCH-SUCCESS: ${x}`)))

const fakeNextPage = (x) => Rx.Observable.of(x)
  .do(() => console.log(`NEXT-PAGE-START: ${x}`))
  .flatMap(() =>
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))

// subscribes
searchSub
  .do(() => console.warn('NEW_SEARCH'))
  .do(() => pendingSub.next(1)) // new search -- ingore current state
  .switchMap(
    (x) => fakeSearch(x)
    .do(dec) // search ended
    .concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
      // .merge(nextPageSub // if you wanted to allow nextPage when search still pending
      .do(inc) // nexpage started
      .flatMap(fakeNextPage) // optionally switchMap
      .do(dec) // nextpage ended
    )
  ).subscribe();

pendingSub
  .filter(x => x !== undefined) // behavior-value initially not defined
  .subscribe(n => console.log('PENDING-REQUESTS', n))

// TEST
const test = () => {
    searchSub.next('s1');
    nextPageSub.next('p1');
    nextPageSub.next('p2');

    setTimeout(() => searchSub.next('s2'), 200)
  }
// test();

// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
  if (counter % 10 === 0) {
    searchSub.next('s' + counter++)
  }
  nextPageSub.next('p' + counter++);
  if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}

fuzzyTest()
<script src="https://npmcdn.com/[email protected]/bundles/Rx.umd.js"></script>
like image 123
artur grzesiak Avatar answered Nov 15 '22 23:11

artur grzesiak


One way: use the finally operator (rxjs4 docs, rxjs5 source). Finally triggers whenever the observable is unsubscribed or completes for any reason.

I'd also move the counter logic to inside the concatMap function since you are really counting the getPage requests, not the number of values that have gone through. Its a subtle difference.

var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .concatMap(function(pageNumber) {
            ++pendingRequests;
            // assumes getPage returns an Observable and not a Promise
            return MyHTTPService.getPage(pageNumber)
               .finally(function () { --pendingRequests; })
        });
like image 29
Brandon Avatar answered Nov 15 '22 21:11

Brandon


I wrote a solution for your problem from scratch.
For sure it might be written in a more functional way but it works anyway.

This solution is based on reqStack which contains all requests (keeping the call order) where a request is an object with id, done and type properties.

When the request is done then requestEnd method is called. There are two conditions and at least one of them is enough to hide a loader.

  1. When the last request on the stack was a search request then we can hide a loader.
  2. Otherwise, all other requests have to be already done.

    function getInstance() {
     return {
        loaderVisible: false,
        reqStack: [],
    
        requestStart: function (req){
            console.log('%s%s req start', req.type, req.id)
            if(_.filter(this.reqStack, r => r.done == false).length > 0 && !this.loaderVisible){
                this.loaderVisible = true
                console.log('loader visible')
            }
        },
    
        requestEnd: function (req, body, delay){
            console.log('%s%s req end (took %sms), body: %s', req.type, req.id, delay, body)
            if(req === this.reqStack[this.reqStack.length-1] && req.type == 'search'){
                this.hideLoader(req)
                return true
            } else if(_.filter(this.reqStack, r => r.done == true).length == this.reqStack.length && this.loaderVisible){
                this.hideLoader(req)
                return true
            } 
            return false
        },
    
        hideLoader: function(req){
            this.loaderVisible = false
            console.log('loader hidden (after %s%s request)', req.type, req.id)
        },
    
        getPage: function (req, delay) {
            this.requestStart(req)
            return Rx.Observable
                    .fromPromise(Promise.resolve("<body>" + Math.random() + "</body>"))
                    .delay(delay)
        },
    
        search: function (id, delay){
            var req = {id: id, done: false, type: 'search'}
            this.reqStack.push(req)
            return this.getPage(req, delay).map(body => {  
                        _.find(this.reqStack, r => r.id == id && r.type == 'search').done = true
                        return this.requestEnd(req, body, delay)
                    })
        },
    
        nextPage: function (id, delay){
            var req = {id: id, done: false, type: 'nextPage'}
            this.reqStack.push(req)
            return this.getPage(req, delay).map(body => {  
                        _.find(this.reqStack, r => r.id == id && r.type == 'nextPage').done = true
                        return this.requestEnd(req, body, delay)
                    })
        },
    }
    }
    

Unit tests in Moca:

describe('animation loader test:', function() {

    var sut

    beforeEach(function() {
        sut = getInstance()
    })

    it('search', function (done) {
        sut.search('1', 10).subscribe(expectDidHideLoader)
        testDone(done)
    })

    it('search, nextPage', function (done) {
        sut.search('1', 50).subscribe(expectDidHideLoader)
        sut.nextPage('1', 20).subscribe(expectDidNOTHideLoader)
        testDone(done)
    })

    it('search, nextPage, nextPage', function(done) {
        sut.search('1', 50).subscribe(expectDidHideLoader)
        sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
        sut.nextPage('2', 30).subscribe(expectDidNOTHideLoader)
        testDone(done)
    })

    it('search, nextPage, nextPage - reverse', function(done) {
        sut.search('1', 30).subscribe(expectDidNOTHideLoader)
        sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
        sut.nextPage('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })

    it('search, search', function (done) {
        sut.search('1', 60).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.search('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })

    it('search, search - reverse', function (done) {
        sut.search('1', 40).subscribe(expectDidNOTHideLoader) 
        sut.search('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })

    it('search, nextPage, search', function (done) {
        sut.search('1', 40).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.nextPage('1', 30).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.search('2', 10).subscribe(expectDidHideLoader)
        testDone(done)
    })

    it('search, nextPage (call after response from search)', function (done) {
        sut.search('1', 10).subscribe(result => {
            expectDidHideLoader(result)
            sut.nextPage('1', 10).subscribe(expectDidHideLoader)
        })
        testDone(done)   
    })

    function expectDidNOTHideLoader(result){
        expect(result).to.be.false
    }

    function expectDidHideLoader(result){
        expect(result).to.be.true
    }

    function testDone(done){
        setTimeout(function(){
            done()
        }, 200)
    }

})

Part of the output:

enter image description here

JSFiddle is here.

like image 29
klimat Avatar answered Nov 15 '22 22:11

klimat