Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are the practical differences between an AsyncIterable and an Observable?

I've been hung up about this topic lately. It seems AsyncIterables and Observables both have stream-like qualities, though they are consumed a bit differently.

You could consume an async iterable like this

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()

You can consume an observable like this

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

My overarching question is based off of this RxJS pr

If the observable emits at a pace faster than the loop completes, there will be a memory build up as the buffer gets more full. We could provide other methods that use different strategies (e.g. just the most recent value, etc), but leave this as the default. Note that the loop itself may have several awaits in it, that exacerbate the problem.

It seems to me that async iterators inherently do not have a backpressure problem, so is it right to implement Symbol.asyncIterator (@@asyncIterator) on an Observable and default to a backpressure strategy? Is there even a need for Observables in light of AsyncIterables?

Ideally, you could show me practical differences between AsyncIterables and Observables with code examples.

like image 491
richytong Avatar asked Jun 10 '20 19:06

richytong


1 Answers

The main difference is which side decides when to iterate.

In the case of Async Iterators the client decides by calling await iterator.next(). The source decides when to resolve the promise, but the client has to ask for the next value first. Thus, the consumer "pulls" the data in from the source.

Observables register a callback function which is called by the observable immediately when a new value comes in. Thus, the source "pushes" to the consumer.

An Observable could easily be used to consume an Async Iterator by using a Subject and mapping it to the next value of the async iterator. You would then call next on the Subject whenever you're ready to consume the next item. Here is a code sample

const pull = new Subject();
const output = pull.pipe(
  concatMap(() => from(iter.next())),
  map(val => { 
    if(val.done) pull.complete();
    return val.value;
  })
);
//wherever you need this 
output.pipe(

).subscribe(() => {
  //we're ready for the next item
  if(!pull.closed) pull.next();
});

like image 200
ranger83753992 Avatar answered Oct 14 '22 01:10

ranger83753992