I'm trying to block main stream until config is initialized using withLatestFrom operator and secondary stream (which contains info about config state).
Here is my code:
@Effect()
public navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter((action: RouterNavigationAction<RouterStateProjection>) => {
return action.payload.event.url.includes('/redemption-offers/list/active');
})
.do(() => console.log('before withLatestFrom'))
.withLatestFrom(
this.store.select(selectConfigInitialized)
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.do(() => console.log('after withLatestFrom'))
.switchMap(data => {
const action = data[0] as RouterNavigationAction<RouterStateProjection>;
return [
new MapListingOptionsAction(action.payload.routerState.queryParams),
new LoadActiveOffersAction(),
];
});
Problem is that second do (after withLatestFrom) block never gets called.
Log:
Thank you for your suggestions.
I think what you want is .combineLatest.
withLatestFrom only treats what comes before it as the trigger.
combineLatest treats both the source and the observables provided to it as the trigger.
So your issue is that the source (route change) fires and the observable passed to withLatest (state initialized) has not emited yet because of its filter. It only emits a value after the source fires. So it is ignored.
Here is a running example of what you are doing:
const first = Rx.Observable.create((o) => {
setTimeout(() => {
o.next();
}, 2000);
});
const second = Rx.Observable.create((o) => {
setTimeout(() => {
o.next(false);
}, 1000);
setTimeout(() => {
o.next(true);
}, 3000);
});
first
.do(() => console.log('before withLatestFrom'))
.withLatestFrom(
second
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Here is with combineLatest:
const first = Rx.Observable.create((o) => {
setTimeout(() => {
o.next();
}, 2000);
});
const second = Rx.Observable.create((o) => {
setTimeout(() => {
o.next(false);
}, 1000);
setTimeout(() => {
o.next(true);
}, 3000);
});
first
.do(() => console.log('before withLatestFrom'))
.combineLatest(
second
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
The withLatestFrom emits only when its source Observable emits. What happens is that this.store.select emits before the source to withLatestFrom. You can see it from the order of emissions:
before config filter false
before withLatestFrom
...
Now it depends on what you want to achieve. You can trigger this.store.select(selectConfigInitialized) only when the source emits by using concatMap:
navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter(...)
.concatMap(() => this.store.select(selectConfigInitialized)...)
.switchMap(data => { ... })
...
Or you can pass along results from both Observables:
navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter(...)
.concatMap(nav => this.store.select(selectConfigInitialized)
.map(result => [nav, result])
)
.switchMap(data => { ... })
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With