Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rxjs Subject#next asynchronous or not?

Hi I wonder if the following code does execute in sequence? i.e. whether the logging "Worker has finished task C" does always happen after the logging "Finished 3 task(s)"?

Long question: with the scan operator, I can be sure the tasks are executed in sequence, that I am not worried. What worries me is I want from the last subscribe to do something only after task C has completed and I am not sure if the position of where o.complete() is put guarantees that. e.g. will start() runs do.("A") -> do.("B") -> do.("C") without waiting for the scans to complete and runs o.complete() straight away, giving an output:

Worker has finished task C
Doing task A
Finished 1 task(s)
Doing task B
Finished 2 task(s)
Doing task C
Finished 3 task(s)

If that can be the case, how do you fix the code so that achieves what I have described?

https://stackblitz.com/edit/typescript-xhhwme

class Worker {
  private tasks: Subject<string>;
  public init(): Observable<number> {
    this.tasks = new Subject<string>();
    return this.tasks.scan((count, task) => {
      console.log("Doing task " + task);
      return ++count;
    }, 0).asObservable();
  }
  public do(task: string): void {
    this.tasks.next(task);
  }
}

function start(worker: Worker): Observable<void> {
  return Observable.create(o => {
    const monitor = worker.init();
    monitor.subscribe(c => console.log("Finished " + c + " task(s)"));
    worker.do("A");
    worker.do("B");
    worker.do("C");
    o.complete();
    worker.do("D");
  });
}

const worker = new Worker();
start(worker).subscribe({
  complete: () => console.log("Worker has finished task C")
});
like image 621
user1589188 Avatar asked Jun 28 '18 08:06

user1589188


People also ask

What is a subject in RxJS?

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast. A Subject is like an Observable, but can multicast to many Observers.

What is difference between subject and observable?

In comparison to a regular Observable, a Subject allows values to be multicasted to many Observers. A Subject and its subscribers have a one-to-many relationship. A Subject can be an Observable as well as an Observer. They hold a registry of many listeners to multiple Observables.

How many types of subject are there in RxJS?

But rxjs offers different types of Subjects, namely: BehaviorSubject, ReplaySubject and AsyncSubject.

How do I create a subject in RxJS?

import { Subject } from 'rxjs'; const subject_test = new Subject(); subject_test. subscribe({ next: (v) => console. log(`From Subject : ${v}`) }); subject_test. subscribe({ next: (v) => console.


Video Answer


1 Answers

TLDR: Subject.next is synchronous.

Reactive streams are synchronous if the source is synchronous, unless you explicitly make them asynchronous or mix them with asynchronous streams. None of that happens in your code. Some examples:

//Synchronous
of(1,2)
  .subscribe(console.log);

//asynchronous because of async source
interval(1000)
  .subscribe(console.log);

//aynchronous because one stream is async (interval)
of(1,2)
  .pipe(
    mergeMap(x => interval(1000).pipe(take(2)))
  )
  .subscribe(console.log);

//async because we make it async
of(1,2, asyncScheduler)
  .subscribe(console.log);

What happens in your example? Everything inside Observable.create will be executed immediately. When you call worker.do("A"); then this.tasks.next(task); emits a new value and the tasks stream chain is executed (synchronously). The same happens with B and C.

When you call o.complete(); the start(worker) stream completes and "Worker has finished task C" is printed. Then D is executed by the tasks stream.

You can find more extensive information on asynchronous / synchronous behavior in these articles:

  • Concurrency and Asynchronous Behavior with RxJS
  • Reactive Streams and Multithreading
like image 178
a better oliver Avatar answered Nov 03 '22 19:11

a better oliver