Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to buffer stream events?

Tags:

dart

I have a web component which subscribes to a stream.

Since the web component is re-created each time it's displayed, I have to clean up the subscriber and redo it.

Right now I am adding all subscribers to a list and in removed() life-cycle method I'm doing :

subscriptions.forEach((sub) => sub.cancel());

Now, to the problem: when the web component isn't displayed, there's no one listening to the stream. The issue is that the component is missing data/events when it's not displayed.

What I need is buffering. Events need to be buffered and sent at once when a listener is registered. According to the documentation, buffering happens until a listener is registered:

The controller will buffer all incoming events until the subscriber is registered.

This works, but the problem is that the listener will at some point removed, and re-registered, and it appears this does not trigger buffering.

It appears that buffering happens only initially, not later on even if all listeners are gone.

So the question is: how do I buffer in this situation where listeners may be gone and back?

like image 793
Kai Sellgren Avatar asked May 01 '13 18:05

Kai Sellgren


1 Answers

Note: normally you shouldn't be able to resubscribe to a Stream that has already been closed. This seems to be a bug we forgot to fix.

I'm unfamiliar with web-components but I hope I'm addressing your problem with the following suggestion.

One way (and there are of course many) would be to create a new Stream for every subscriber (like html-events do) that pauses the original stream.

Say origin is the original Stream. Then implement a stream getter that returns a new Stream that is linked to origin:

Untested code.

Stream origin;
var _subscription;
final _listeners = new Set<StreamController>();

_addListener(controller) {
  _listeners.add(controller);
  if (_subscription == null) {
    _subscription = origin.listen((event) {
      // When we emit the event we want listeners to be able to unsubscribe
      // or add new listeners. In order to avoid ConcurrentModificationErrors
      // we need to make sure that the _listeners set is not modified while
      // we are iterating over it with forEach. Here we just create a copy with
      // toList().
      // Alternatively (more efficient) we could also queue subscription
      // modification requests and do them after the forEach.
      _listeners.toList().forEach((c) => c.add(event));
    });
  }
  _subscription.resume();  // Just in case it was paused.
}
_removeListener(controller) {
  _listeners.remove(controller);
  if (_listeners.isEmpty) _subscription.pause();
}

Stream get stream {
  var controller;
  controller = new StreamController(
      onListen: () => _addListener(controller),
      onCancel: () => _removeListener(controller));
  return controller.stream;
}

If you need to buffer events immediately you need to start the subscription right away and not lazily as in the sample code.

like image 153
Florian Loitsch Avatar answered Nov 05 '22 05:11

Florian Loitsch