Let's say I have a stream of actions. They're either Prompts, Responses (to prompts) or Effects. They come at irregular intervals, but assume 1 second delay between each one.
On every PROMPT action I want to emit that action and a BEGIN action (let's say we want to show the message to user for N seconds). All other items should be delayed by N seconds, after which the END action fires (hiding the message) and everything continues.
This is my code for it (for https://rxviz.com/):
const { interval, from, zip, timer } = Rx;
const { concatMap, delayWhen } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
);
action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
What I really want to do is for first RESPONSE action after PROMPT to not be affected by the delay. If it comes before END action, it should be shown right away. So, instead of
P^ &REP^ &REEE
I want to receive
P^ R &EP^R &EEE
How can I achieve it while keeping each RESPONSE after their corresponding PROMPT? Assume no events can come between PROMPT and RESPONSE.
If I understand it right, this is a very interesting problem to address with Observables streams. This is the way I would attack it.
First I would store in a constant actionDelayed$ the result of your original logic, i.e. a stream where we have introduced, after each PROMPT, BEGIN and END actions divided by a delay.
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
);
Then I would create 2 separate streams, response$ and promptDelayed$, containing only the RESPONSE actions before the delay was introduced and the PROMPT actions after the delayed was introduced, like this
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
With these 2 streams, I can create another stream of RESPONSE actions emitted just after the PROMPT delayed actions are emitted, like this
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
At this point I have just to remove all RESPONSE actions from actionDelayed$ like this
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
and merge actionNoResponseDelayed$ with responseN1AfterPromptN$ to get the final stream.
The entirety of the code, to be tried with rxviz is this
const { interval, from, zip, timer, merge } = Rx;
const { concatMap, delayWhen, share, filter, map } = RxOperators;
const PROMPT = 'P';
const RESPONSE = 'R';
const EFFECT = 'E';
const BEGIN = '^';
const END = '&';
const convertAction = action => (action === PROMPT) ? [PROMPT, BEGIN, END] : [action];
// Just actions coming at regular intervals
const action$ = zip(
from([PROMPT, RESPONSE, EFFECT, PROMPT, RESPONSE, EFFECT, EFFECT, EFFECT]),
interval(1000),
(a, b) => a,
).pipe(share());
const actionDelayed$ = action$.pipe(
concatMap(action =>
from(convertAction(action)).pipe(
delayWhen(action => (action == END) ? timer(5000) : timer(0)),
),
),
share()
);
const response$ = action$.pipe(
filter(a => a == RESPONSE)
)
const promptDelayed$ = actionDelayed$.pipe(
filter(a => a == PROMPT)
)
const responseN1AfterPromptN$ = zip(response$, promptDelayed$).pipe(
map(([r, p]) => r)
)
const actionNoResponseDelayed$ = actionDelayed$.pipe(
filter(a => a != RESPONSE)
)
merge(actionNoResponseDelayed$, responseN1AfterPromptN$)
The use of the share operator while creating action$ and actionDelayed$ streams allows to avoid repeated subscriptions to these streams when creating the subsequent streams used in the solution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With