Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make an RxJS Observable emit at specific datetimes?

I have an RxJS Observable that needs to be recalculated at specific times, as described by an array of DateTime objects (although for the purpose of this question they could be JavaScript Date objects, epoch milliseconds or anything else representing a specific instant in time):

const changeTimes = [
    //            yyyy, mm, dd, hh, mm
    DateTime.utc( 2018, 10, 31, 21, 45 ),
    DateTime.utc( 2018, 10, 31, 21, 50 ),
    DateTime.utc( 2018, 10, 31, 22, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 00 ),
    DateTime.utc( 2018, 10, 31, 23, 30 ),
];

I'm struggling to understand how to create an Observable that would emit at the times specified in such an array.

Here's what I've thought about in an attempt to answer my own question:

  • I almost certainly need to use the delay operator where the specified delay is the time between “now” and the next future datetime.
  • I somehow need to ensure that “now” is current at the time of subscription, not at the time of Observable creation—possibly by using the defer operator—although I don't want to unnecessarily create multiple Observable instances if there are multiple subscriptions.
  • I'm unsure how to iterate over the array as time passes—the expand operator might be what I need, but it calls something recursively, and I'm just trying to iterate over a list.
  • The timer operator seems irrelevant, since the duration between each datetime is different.
  • I could map every datetime to its own delayed Observable and return them all via merge, but this becomes horribly inefficient as the number of datetimes in the array increases (there could be hundreds), so this is an absolute last resort.

How can I make an RxJS Observable that takes a list of datetimes and then emits as each one is reached in time, completing on the final one?

like image 602
Alex Peters Avatar asked Oct 31 '18 11:10

Alex Peters


People also ask

Which operator is used to alter each value coming out of an Observable?

The map operator is a Transformation Operator. It takes values from one Observable, transforms them, and creates a new Observable that emits the transformed values.

Is Observable lazy?

No, they aren't lazy, but they are asynchronous.

Does Observable stream data synchronously and asynchronously?

An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous.


1 Answers

I think what you summarized in the bullet points is all correct. Using delay seems obvious but it'll make the chain hard to understand.

Solution that comes to my mind assumes that you know the changeTimes array before creating the observable chain. You can create your own "observable creation method" that will return an Observable that emits based on setTimeout for example (this is just "pseudo code", it doesn't calculate the date properly):

const schedule = (dates: Date[]): Observable<Date> => new Observable(observer => {
  // Sort the `dates` array from the earliest to the latest...

  let index = 0;
  let clearTimeout;

  const loop = () => {
    const now = new Date();
    const delay = dates[index] - now;

    clearTimeout = setTimeout(() => {
      observer.next(dates[index++]);

      if (index < dates.length) {
        loop();
      }
    }, delay);
  }

  loop();

  return () => clearTimeout(clearTimeout);
}); 

...

schedule(changeTimes)
  .subscribe(...)

The last option you mention with merge isn't that bad actually. I understand you're concerned that it'll create a lot of subscriptions but if you sort the changeTimes array and then use concat instead of merge it'll always keep only one active subscription even if you create 100s of Observables.

like image 90
martin Avatar answered Oct 28 '22 14:10

martin