Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What rxjs operator is like concatmap but waits for each request before firing the next?

For example let's say I want to make some api calls. The api I'm dealing with is prone to race conditions so if I make 3 api calls at the same time updating the same piece of data on the server, it could lose some of the data.

Therefore I want to queue up my requests and then fire one off wait for the response to come back before firing the next request.

Basically I need something like concatMap but the problem with concatMap is that it fires all the requests at the same time. I need concatMap to just wait before firing off the next request. I'm using rxjs 5.

Here's a plunker using angular2 where you can click buttons. When you click 1 sec button an observable will get created that returns after 1 second. There's 2 sec and 3 sec buttons.

https://plnkr.co/edit/6F4JrVueQX8PjPinZqIk?p=preview

@Component({
  selector: 'my-app',
  template: `
    <div>
      <h2>Wait:</h2>

      <button (click)="start(1)">1 sec</button>
      <button (click)="start(2)">2 sec</button>
      <button (click)="start(3)">3 sec</button>
    </div>
  `,
})
export class App {


  constructor() {
  }

  start(wait) {

    const waitSecs = parseInt(wait) * 1000;

    of('clicked').delay(waitSecs).subscribe(
      val => console.log(wait)  
    )

    // Expected behavior:
    // I click 3 sec, 2 sec, and 1 sec right after another.  Then 
    // the console log should output 3, 2, 1.  Right now it's 
    // outputting 1, 2, 3.

  }
}

My ideal behavior with this app would be after I click 3 sec, 2 sec, and 1 sec right after another. Then the console should output 3, 2, 1. Right now it's outputting 1, 2, 3.

like image 455
seescode Avatar asked Jan 31 '23 00:01

seescode


2 Answers

Since you want to have only request executing you can use the mergeMap() operator with the concurrency set to 1. Note that this will mean that to-be-send items will queue up inside your mergeMap operator which can lead to memory issues.

Rx.Observable.from([3,5,2,1])
  .mergeMap(
    i => {
      return Rx.Observable.of(i)
        .do(i => console.log(`starting request for ${i}`))
        .delay(1000)
        .do(null,null, () => console.log(`finished request for ${i}`));
    },
    null,
    1 /* concurrency limited to 1 */
  )
  .subscribe(val => console.log(`received value: ${val}`));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>
like image 72
Mark van Straten Avatar answered Feb 05 '23 17:02

Mark van Straten


You should post some code. concatMap should work for your use case. If you observe firing before subscription, it means you need to use defer operator to have the firing at subscription time but without code samples it is hard to say more.

Useful links from past questions :

  • How to start second observable *only* after first is *completely* done in rxjs
  • Rx.js wait for callback to complete
  • Rxjs, understanding defer
like image 29
user3743222 Avatar answered Feb 05 '23 17:02

user3743222