Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delaying all items except specific one

Let's say I have a stream of actions. They're either Prompts, Responses (to prompts) or Effects. They come at irregular intervals, but assume 1 second delay between each one.

On every PROMPT action I want to emit that action and a BEGIN action (let's say we want to show the message to user for N seconds). All other items should be delayed by N seconds, after which the END action fires (hiding the message) and everything continues.

This is my code for it (for https://rxviz.com/):

const { interval, from, zip, timer } = Rx;
const { concatMap, delayWhen } = RxOperators;

const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';

const BEGIN = '^';
const END = '&';

const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];

// Just actions coming at regular intervals
const action$ = zip(
  from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
    interval(1000),
  (a, b) => a,
);

action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
);

What I really want to do is for first RESPONSE action after PROMPT to not be affected by the delay. If it comes before END action, it should be shown right away. So, instead of

P^ &REP^ &REEE

I want to receive

P^ R &EP^R &EEE

How can I achieve it while keeping each RESPONSE after their corresponding PROMPT? Assume no events can come between PROMPT and RESPONSE.

like image 400
Kuroki Kaze Avatar asked Apr 11 '26 03:04

Kuroki Kaze


1 Answers

If I understand it right, this is a very interesting problem to address with Observables streams. This is the way I would attack it.

First I would store in a constant actionDelayed$ the result of your original logic, i.e. a stream where we have introduced, after each PROMPT, BEGIN and END actions divided by a delay.

const actionDelayed$ = action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
);

Then I would create 2 separate streams, response$ and promptDelayed$, containing only the RESPONSE actions before the delay was introduced and the PROMPT actions after the delayed was introduced, like this

const response$ = action$.pipe(
  filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
  filter(a => a == PROMPT)
)

With these 2 streams, I can create another stream of RESPONSE actions emitted just after the PROMPT delayed actions are emitted, like this

const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
  map(([r, p]) => r)
)

At this point I have just to remove all RESPONSE actions from actionDelayed$ like this

const actionNoResponseDelayed$ = actionDelayed$.pipe(
  filter(a => a != RESPONSE)
)

and merge actionNoResponseDelayed$ with responseN1AfterPromptN$ to get the final stream.

The entirety of the code, to be tried with rxviz is this

const { interval, from, zip, timer, merge } = Rx;
const { concatMap, delayWhen, share, filter, map } = RxOperators;

const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';

const BEGIN = '^';
const END = '&';

const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];

// Just actions coming at regular intervals
const action$ = zip(
  from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
    interval(1000),
  (a, b) => a,
).pipe(share());

const actionDelayed$ = action$.pipe(
  concatMap(action =>
    from(convertAction(action)).pipe(
      delayWhen(action => (action == END) ? timer(5000) : timer(0)),
    ),
  ),
  share()
);

const response$ = action$.pipe(
  filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
  filter(a => a == PROMPT)
)
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
  map(([r, p]) => r)
)
const actionNoResponseDelayed$ = actionDelayed$.pipe(
  filter(a => a != RESPONSE)
)

merge(actionNoResponseDelayed$, responseN1AfterPromptN$)

The use of the share operator while creating action$ and actionDelayed$ streams allows to avoid repeated subscriptions to these streams when creating the subsequent streams used in the solution.

like image 54
Picci Avatar answered Apr 12 '26 17:04

Picci



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!