Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ReactiveX: Group and Buffer only last item in each group

How to group an Observable, and from each GroupedObservable keep in memory only the last emitted item? So that each group would behave just like BehaviorSubject.

Something like this:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

So in memory we'd have only have the last item for each user:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

And when a subscriber subscribes, these two items were issued right away (each in it's own emission). Like we had BehaviorSubject for each user.

onCompleted() is never expected to fire, as people may chat forever.

I don't know in advance what user values there can be.

like image 565
Dzmitry Lazerka Avatar asked Dec 08 '15 00:12

Dzmitry Lazerka


2 Answers

I assume your chatlog observable is hot. The groupObservables emitted by #groupBy will consequently also be hot and won't keep anything in memory by themselves.

To get the behavior you want (discard everything but the last value from before subscription and continue from there) you could use a ReplaySubject(1).

Please correct me if I'm wrong

see jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });
like image 52
Niklas Fasching Avatar answered Nov 04 '22 09:11

Niklas Fasching


You can write the reducing function that turns out the latest emitted items of grouped observables, pass that to a scan observable, and use shareReplay to recall the last values emitted for new subscribers. It would be something like this :

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

So what you get anytime you subscribe is an array whose size at any moment will be the number of users who emitted messages, and whose content will be the messages emitted by the users ordered by time of emission.

Anytime there is a subscription the latest array is immediately passed on to the subscriber. That's an array though, I can't think of a way how to pass the values one by one, at the same time fulfilling your specifications. Hope that is enough for your use case.

UPDATE : jsbin here http://jsfiddle.net/zs7ydw6b/2

like image 20
user3743222 Avatar answered Nov 04 '22 10:11

user3743222