How to group an Observable, and from each GroupedObservable keep in memory only the last emitted item? So that each group would behave just like BehaviorSubject.
Something like this:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
So in memory we'd have only have the last item for each user
:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
And when a subscriber subscribes, these two items were issued right away (each in it's own emission). Like we had BehaviorSubject for each user
.
onCompleted() is never expected to fire, as people may chat forever.
I don't know in advance what user
values there can be.
I assume your chatlog observable is hot. The groupObservables emitted by #groupBy will consequently also be hot and won't keep anything in memory by themselves.
To get the behavior you want (discard everything but the last value from before subscription and continue from there) you could use a ReplaySubject(1).
Please correct me if I'm wrong
see jsbin
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});
You can write the reducing function that turns out the latest emitted items of grouped observables, pass that to a scan
observable, and use shareReplay
to recall the last values emitted for new subscribers. It would be something like this :
var fn_scan = function ( aMessages, message ) {
// aMessages is the latest array of messages
// this function will update aMessages to reflect the arrival of the new message
var aUsers = aMessages.map(function ( x ) {return x.user;});
var index = aUsers.indexOf(message.user);
if (index > -1) {
// remove previous message from that user...
aMessages.splice(index, 1);
}
// ...and push the latest message
aMessages.push(message);
return aMessages;
};
var groupedLatestMessages$ = messages$
.scan(fn_scan, [])
.shareReplay(1);
So what you get anytime you subscribe is an array whose size at any moment will be the number of users who emitted messages, and whose content will be the messages emitted by the users ordered by time of emission.
Anytime there is a subscription the latest array is immediately passed on to the subscriber. That's an array though, I can't think of a way how to pass the values one by one, at the same time fulfilling your specifications. Hope that is enough for your use case.
UPDATE : jsbin here http://jsfiddle.net/zs7ydw6b/2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With