Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancel stream onData

Tags:

I have an event bus that handles all central events to my app. I have a special case where I have a sequence of asynchronous actions that I want to execute once only (when a special event happens), so I have to launch async function one, go through out of control other functions that will trigger an event for my second action, and so on.

So I need to launch action one, then listen to the event bus waiting for action one to trigger (undirectly) an event that will launch action two etc...

Naturally, once each element of the sequence is executed, I want to stop listening to the event that triggered it.

I imagined for that, a consumeOnce(event, action) function that will subscribe to the bus, wait for the expected event, execute the action when receiving the event and immediately cancel the subscription once the action launched (asynchronously)

  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  void consumeOnce(PlaceParam param, Function executeOnce) {
    StreamSubscription subscription = _controller.stream.listen((Map<PlaceParam, dynamic> params) {
      if(params.containsKey(param)) {
        executeOnce();
        subscription.cancel(); //can't access, too early: not created yet
      }
    });
  }

The issue is that I can't access the variable subscription in the body of my callback, since it is still not created at the time

Since nothing garanties that listeners will execute in their subscription order, I cannot register another subscriber who will remove my subscription (and even if the execution order was guaranteed, I will anyway find my self with a subscription that I can't remove : the one responsible for removing my original subscription)...

Any ideas please?

This pattern could solve my problem, but I don't find it elegant:

@Injectable()
class EventBus<K, V> {
  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  Future<Null> fire(Map<PlaceParam, dynamic> params) async {
    await _controller.add(params);
  }

  Stream<Map<PlaceParam, dynamic>> getBus() {
    return _controller.stream;
  }


  void consumeOnce(PlaceParam param, Function executeOnce) {
    SubscriptionRemover remover = new SubscriptionRemover(param, executeOnce);
    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce);
    remover.subscription = subscription;
  }
}

class SubscriptionRemover {
  PlaceParam param;
  Function executeOnce;
  StreamSubscription subscription;

  SubscriptionRemover(this.param, this.executeOnce);

  void execute(Map<PlaceParam, dynamic> params) {
    if (params.containsKey(param)) {
      executeOnce();
      subscription.cancel();
    }
  }
}

But I don't like it much since theoretically, the event could happen between the two calls :

    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce); //event may occur now!!!
    remover.subscription = subscription;

I think the existence of a method: _controller.stream.remove(Function fn) would have been a lot more direct and clear.

Am I right? or is there a way I didn't think of?

like image 254
Zied Hamdi Avatar asked Jun 09 '17 06:06

Zied Hamdi


People also ask

How do I cancel a stream on DART?

Many of you know that you can't cancel a Future in Dart, but you can cancel a subscription to a Stream. So one way you could handle this situation is to rewrite getData() to return a Stream instead.

What is StreamSubscription?

A subscription on events from a Stream. When you listen on a Stream using Stream. listen, a StreamSubscription object is returned. The subscription provides events to the listener, and holds the callbacks used to handle the events.


2 Answers

The issue is that I can't access the variable subscription in the body of my callback, since it is still not created at the time

That's correct - you cannot access is the subscription variable, even if you know that the subscription itself would exist. Dart doesn't allow variables declarations to refer to themselves. That's occasionally annoying when the variable it's only referenced inside a closure that won't be executed yet.

The solution is to either pre-declare the variable or to update the onData-listener after doing the listening:

    // Pre-declare variable (can't be final, and with null safety
    // it has to be late).
    late StreamSubscription<Map<PlaceParam, dynamic>> subscription;
    subscription = stream.listen((event) {
      .... subscription.cancel();
    });

or

    final subscription = stream.listen(null);
    subscription.onData((event) {  // Update onData after listening.
      .... subscription.cancel(); ....
    });

There can be cases where you can't access the subscription object yet, but that's only possible if the stream breaks the Stream contract and starts sending events immediately when it's listened to. Streams must not do that, they must wait until a later microtask before delivering the first event, so that the code that called listen has time to, say, receive the subscription and assign it to a variable. It's possible to violate the contract using a synchronous stream controller (which is one reason why synchronous stream controllers should be used judiciously).

(This answer provided was updated for null safety. Prior to Dart 2.12, the former proposal didn't need the late, and was generally the preferred approach. With Dart 2.12 and null safety, the latter approach will likely be better.)

like image 177
lrn Avatar answered Oct 20 '22 03:10

lrn


Just declare the variable beforehand, then you can access it from within the callback:

StreamSubscription subscription;
subscription = _controller.stream.listen((Map<PlaceParam, dynamic> params) {
  if(params.containsKey(param)) {
    executeOnce();
    subscription.cancel(); 
  }
});
like image 38
Günter Zöchbauer Avatar answered Oct 20 '22 04:10

Günter Zöchbauer