Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the proper way to remove a stream from stdout so that another stream may be added?

I am starting a process in Dart that attaches its stdout stream to stdout so that the results can be printed to the terminal like so:

Process.start(executable, ['list','of','args']).then((proc) {
  stdout.addStream(proc.stdout);
  stderr.addStream(proc.stderr);
  return proc.exitCode;
});

However, once this completes I would like to start a new process and begin this again (the function this is in will be called several times). Sometimes, I am getting an error:

Uncaught Error: Bad State: StreamSink is already bound to a stream

Looking at the dart docs, it looks like I may need to do something like stdout.close() or stdout.flush() but these don't seem to solve the problem. What is the proper way to handle having multiple streams in sequence bound to a streamsink?

like image 759
zcleghern Avatar asked May 22 '15 17:05

zcleghern


2 Answers

addStream returns a Future that indicates when the adding of the stream is done. There should only be one stream that addStreams at the same time to a StreamSink.

Depending on what you want/need to do, you have 2 options now:

  • multiplex the output of the process(es) into stdout.
  • await that addStream is finished.

The latter is easier:

Process.start(executable, ['list','of','args']).then((proc) async {
  await stdout.addStream(proc.stdout);  // Edit: don't do this.
  await stdout.addStream(proc.stderr);
  return proc.exitCode;
});

Note the async modifier on the body, and the two awaits in the body.

Edit: It is an error to not listen to stderr immediately. (Your program could block on it).

If the output of your program is small enough, you could just switch to Process.run:

Process.run(executable, ['list','of','args']).then((procResult) {
  stdout.write(procResult.stdout);
  stdout.write(procResult.stderr);
  return procResult.exitCode;
});

It won't interleave the stdout and stderr, though.

like image 195
Florian Loitsch Avatar answered Oct 27 '22 08:10

Florian Loitsch


You can't call addStream more than once at a time on the same sink. A sink is either in "manual" mode or "automatic" mode, the latter being triggered by adding a stream. Until that stream is done being added, pauses are routed to the stream being added instead of to the controller, and you are not allowed to add events manually until it's done. It is as if the added stream takes over the sink until it's done.

To add two (or more) streams to the same sink interleaved, you have to do it manually. One way to do it could be to have a general way to interleave streams, like the following code. Another option is to listen on both streams yourself and add the events to the sink:

Stream interleave(Iterable<Stream> streams) {
  List subscriptions = [];
  StreamController controller;
  controller = new StreamController(
    onListen: () {
      int active = 0;
      void done() {
        active--;
        if (active <= 0) controller.close();
      }
      for (var stream in streams) {
        active++;
        var sub = stream.listen(controller.add, 
                                onError: controller.addError,
                                onDone: done);
        subscriptions.add(sub);             
      }
    },
    onPause: () {
      for (var sub in subscriptions) { sub.pause(); }
    }, 
    onResume: () {
      for (var sub in subscriptions) { sub.resume(); }
    },
    onCancel: () {
      for (var sub in subscriptions) { sub.cancel(); }  
    }
  );
  return controller.stream;
}

(Not thoroughly tested!)

like image 31
lrn Avatar answered Oct 27 '22 10:10

lrn