Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is my published deferred Observable factory being called multiple times?

I have a a task stream that will queue until a signal Subject is triggered using the .zip() operator. The signal subject subscribes to the currently running task. I am trying to also observe the progress emissions of the task.

What I have tried to do is use .publish() to multicast the task Observable so that I can allow the signal Subject to subscribe to the .last() emission of the task to cause a dequeue and also subscribe to the tasks general progress emissions.

This seems to be working. However, whenever I look at what is printed out, it looks like my Observable factory is getting called by each .subscribe() call even though I used .publish(). Am I misunderstanding how multicasting works? I believed the .publish()ed Observable would be created with the factory and that singular instance would be shared, but cold until .connect() was called.

My Task Runner

Notice the .defer() that calls tasker.

"use strict";

const {
  Observable,
  Subject,
  BehaviorSubject
} = Rx;

// How often to increase project in a task
const INTERVAL_TIME = 200;

// Keep track of how many tasks we have
let TASK_ID = 0;

// Easy way to print out observers
function easyObserver(prefix = "Observer") {
  return {
    next: data => console.log(`[${prefix}][next]: ${data}`),
    error: err => console.error(`[${prefix}][error] ${err}`),
    complete: () => console.log(`[${prefix}][complete] Complete`)
  };
}

// Simulate async task
function tasker(name = "", id = TASK_ID++) {
  console.log(`tasker called for ${id}`);

  let progress = 0;
  const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
  console.log(`Task[${name||id}][started]`);
  let interval = setInterval(() => {
    progress = (progress + (Math.random() * 50));
    if (progress >= 100) {
      progress = 100;
      clearInterval(interval);
      progress$.next(`Task[${name||id}][${progress}%]`);
      progress$.complete();
      return;
    }
    progress$.next(`Task[${name||id}][${progress}%]`);
  }, INTERVAL_TIME);

  return progress$.asObservable();
}

// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();

// Make some tasks
const tasks$ = Observable
  .range(0, 3);

// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
  .zip(tasks$, dequeueSignal, (i, s) => i);

// Create task observables
const mcQueuedTasks$ = queuedTasks$
  .map(task => Observable.defer(() => tasker(`MyTask${task}`)))
  .publish();

// Print out the task progress
const progressSubscription = mcQueuedTasks$
  .switchMap(task => task)
  .subscribe(easyObserver("queuedTasks$"));

// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
  .switchMap(task => task.last())
  .delay(500)
  .subscribe(dequeueSignal);

// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

My Output

Notice how you see multiple calls to tasker with the line tasker called for N and that the body of the factory is called. However, before any progress emissions happen, tasker() is called again with the next TASK_ID. The output seems correct because Task[MyTask0] doesn't skip any indices, only TASK_IDs.

tasker called for 0
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][0%]
tasker called for 1
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][20.688413934455674%]
[queuedTasks$][next]: Task[MyTask0][32.928520335195564%]
[queuedTasks$][next]: Task[MyTask0][42.58361384849108%]
[queuedTasks$][next]: Task[MyTask0][73.1297043008671%]
[queuedTasks$][next]: Task[MyTask0][100%]
tasker called for 2
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][0%]
tasker called for 3
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][37.16513927245511%]
[queuedTasks$][next]: Task[MyTask1][47.27771448102375%]
[queuedTasks$][next]: Task[MyTask1][60.45983311604027%]
[queuedTasks$][next]: Task[MyTask1][100%]
tasker called for 4
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][0%]
tasker called for 5
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][32.421275902708544%]
[queuedTasks$][next]: Task[MyTask2][41.30332084025583%]
[queuedTasks$][next]: Task[MyTask2][77.44113197852694%]
[queuedTasks$][next]: Task[MyTask2][100%]
[queuedTasks$][complete] Complete
like image 721
zero298 Avatar asked Jan 09 '18 19:01

zero298


1 Answers

Looks like Observable.defer is unnecessary in this function:

// Create task observables
const mcQueuedTasks$ = queuedTasks$
  .map(task => Observable.defer(() => tasker(`MyTask${task}`)))
  .publish();

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.

You've already created an Observable here:

// Make some tasks
const tasks$ = Observable
  .range(0, 3);

Within the map loop you're creating an extra Observable for each task...

Get rid of Observable.defer so the function would look like this:

// Create task observables
const mcQueuedTasks$ = queuedTasks$
 .map(task => tasker(`MyTask${task}`))
 .publish();

Snippet:

"use strict";

const {
  Observable,
  Subject,
  BehaviorSubject
} = Rx;

// How often to increase project in a task
const INTERVAL_TIME = 200;

// Keep track of how many tasks we have
let TASK_ID = 0;

// Easy way to print out observers
function easyObserver(prefix = "Observer") {
  return {
    next: data => console.log(`[${prefix}][next]: ${data}`),
    error: err => console.error(`[${prefix}][error] ${err}`),
    complete: () => console.log(`[${prefix}][complete] Complete`)
  };
}

// Simulate async task
function tasker(name = "", id = TASK_ID++) {
  console.log(`tasker called for ${id}`);

  let progress = 0;
  const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
  console.log(`Task[${name||id}][started]`);
  let interval = setInterval(() => {
    progress = (progress + (Math.random() * 50));
    if (progress >= 100) {
      progress = 100;
      clearInterval(interval);
      progress$.next(`Task[${name||id}][${progress}%]`);
      progress$.complete();
      return;
    }
    progress$.next(`Task[${name||id}][${progress}%]`);
  }, INTERVAL_TIME);

  return progress$.asObservable();
}

// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();

// Make some tasks
const tasks$ = Observable
  .range(0, 3);

// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
  .zip(tasks$, dequeueSignal, (i, s) => i);

// Create task observables
const mcQueuedTasks$ = queuedTasks$
  .map(task => tasker(`MyTask${task}`))
  .publish();

// Print out the task progress
const progressSubscription = mcQueuedTasks$
  .switchMap(task => task)
  .subscribe(easyObserver("queuedTasks$"));

// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
  .switchMap(task => task.last())
  .delay(500)
  .subscribe(dequeueSignal);

// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Hope it helps.

like image 173
Kosh Avatar answered Oct 13 '22 02:10

Kosh