I am using rxdart package to handle stream in dart. I am stuck in handling a peculiar problem.
Please have a look at this dummy code:
final userId = BehaviorSubject<String>();
Stream<T> getStream(String uid) {
// a sample code that returns a stream
return BehaviorSubject<T>().stream;
}
final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));
Now I want to convert the oops
variable to get only Observable<T>
.
I am finding it difficult to explain clearly. But let me try. I have a stream A. I map each output of stream A to another stream B. Now I have Stream<Stream<B>>
- a kind of recurrent stream. I just want to listen to the latest value produced by this pattern. How may I achieve this?
I will list several ways to flatten the Stream<Stream<T>>
into single Stream<T>
.
1. Using pure dart
As answered by @Irn, this is a pure dart solution:
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
s.listen(print);
}
Outputs: 1 2 3 4 1 2 3 4 1 2 3 4
2. Using Observable.flatMap
Observable has a method flatMap that flattens the output stream and attach it to ongoing stream:
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
s.listen(print);
}
Outputs: 1 2 3 4 1 2 3 4 1 2 3 4
3. Using Observable.switchLatest
Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.
This is the solution I was looking for! I just needed the latest output emitted by the internal stream.
import 'package:rxdart/rxdart.dart';
Stream<int> getStream(String v) {
return Stream.fromIterable([1, 2, 3, 4]);
}
void main() {
List<String> list = ["a", "b", "c"];
Observable<int> s = Observable.switchLatest(
Observable.fromIterable(list).map(getStream));
s.listen(print);
}
Outputs: 1 1 1 2 3 4
It's somewhat rare to have a Stream<Stream<Something>>
, so it isn't something that there is much explicit support for.
One reason is that there are several (at least two) ways to combine a stream of streams of things into a stream of things.
Either you listen to each stream in turn, waiting for it to complete before starting on the next, and then emit the events in order.
Or you listen on each new stream the moment it becomes available, and then emit the events from any stream as soon as possible.
The former is easy to write using async
/await
:
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
await for (var stream in source) yield* stream;
}
The later is more complicated because it requires listening on more than one stream at a time, and combining their events. (If only StreamController.addStream
allowed more than one stream at a time, then it would be much easier). You can use the StreamGroup
class from package:async
for this:
import "package:async/async" show StreamGroup;
Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
var sg = StreamGroup<T>();
source.forEach(sg.add).whenComplete(sg.close);
// This doesn't handle errors in [source].
// Maybe insert
// .catchError((e, s) {
// sg.add(Future<T>.error(e, s).asStream())
// before `.whenComplete` if you worry about errors in [source].
return sg.stream;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With