Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert Stream<Stream<T>> to Stream<T>

Tags:

dart

rxdart

I am using rxdart package to handle stream in dart. I am stuck in handling a peculiar problem.

Please have a look at this dummy code:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

Now I want to convert the oops variable to get only Observable<T>.

I am finding it difficult to explain clearly. But let me try. I have a stream A. I map each output of stream A to another stream B. Now I have Stream<Stream<B>> - a kind of recurrent stream. I just want to listen to the latest value produced by this pattern. How may I achieve this?

like image 627
Dipu Avatar asked Jun 26 '19 10:06

Dipu


2 Answers

I will list several ways to flatten the Stream<Stream<T>> into single Stream<T>.

1. Using pure dart

As answered by @Irn, this is a pure dart solution:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}

Outputs: 1 2 3 4 1 2 3 4 1 2 3 4

2. Using Observable.flatMap

Observable has a method flatMap that flattens the output stream and attach it to ongoing stream:

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}

Outputs: 1 2 3 4 1 2 3 4 1 2 3 4

3. Using Observable.switchLatest

Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.

This is the solution I was looking for! I just needed the latest output emitted by the internal stream.

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}

Outputs: 1 1 1 2 3 4

like image 101
Dipu Avatar answered Oct 27 '22 16:10

Dipu


It's somewhat rare to have a Stream<Stream<Something>>, so it isn't something that there is much explicit support for.

One reason is that there are several (at least two) ways to combine a stream of streams of things into a stream of things.

  1. Either you listen to each stream in turn, waiting for it to complete before starting on the next, and then emit the events in order.

  2. Or you listen on each new stream the moment it becomes available, and then emit the events from any stream as soon as possible.

The former is easy to write using async/await:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

The later is more complicated because it requires listening on more than one stream at a time, and combining their events. (If only StreamController.addStream allowed more than one stream at a time, then it would be much easier). You can use the StreamGroup class from package:async for this:

import "package:async/async" show StreamGroup;

Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
  var sg = StreamGroup<T>();
  source.forEach(sg.add).whenComplete(sg.close);
  // This doesn't handle errors in [source].
  // Maybe insert 
  //   .catchError((e, s) {
  //      sg.add(Future<T>.error(e, s).asStream())
  // before `.whenComplete` if you worry about errors in [source].          
  return sg.stream;
}
like image 3
lrn Avatar answered Oct 27 '22 16:10

lrn