Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recursive observable

Tags:

rxjs

reactivex

I'm working with RxJs and I have to make a polling mechanism to retrieve updates from a server.

I need to make a request every second, parse the updates, emit it and remember its id, because I need it to request the next pack of updates like getUpdate(lastId + 1).

The first part is easy so I just use interval with mergeMap

let lastId = 0
const updates = Rx.Observable.interval(1000)
    .map(() => lastId)
    .mergeMap((offset) => getUpdates(offset + 1))

I'm collecting identifiers like this:

updates.pluck('update_id').scan(Math.max, 0).subscribe(val => lastId = val)

But this solution isn't pure reactive and I'm looking for the way to omit the usage of "global" variable.

How can I improve the code while still being able to return observable containing just updates to the caller?

UPD.

The server response for getUpdates(id) looks like this:

[
  { update_id: 1, payload: { ... } },
  { update_id: 3, payload: { ... } },
  { update_id: 2, payload: { ... } }
]

It may contain 0 to Infinity updates in any order

like image 590
Elessar.perm Avatar asked Dec 25 '17 13:12

Elessar.perm


1 Answers

Something like this? Note that this is an infinite stream since there is no condition to abort; you didn't give one.

// Just returns the ID as the update_id.
const fakeResponse = id => {
  return [{ update_id: id }];
};

// Fakes the actual HTTP call with a network delay.
const getUpdates = id => Rx.Observable.of(null).delay(250).map(() => fakeResponse(id));

// Start with update_id = 0, then recursively call with the last
// returned ID incremented by 1.
// The actual emissions on this stream will be the full server responses.
const updates$ = getUpdates(0)
  .expand(response => Rx.Observable.of(null)
    .delay(1000)
    .switchMap(() => {
      const highestId = Math.max(...response.map(update => update.update_id));
      return getUpdates(highestId + 1);
    })
  )

updates$.take(5).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

To define the termination of the stream, you probably want to hook into the switchMap at the end; use whatever property of response to conditionally return Observable.empty() instead of calling getUpdates again.

like image 96
Ingo Bürk Avatar answered Jan 04 '23 01:01

Ingo Bürk