I'd like to use RxJS to "bridge" async world of events with sync world. Specifically I want to create an function which returns an array of events collected during some time interval.
I can create Observable which does what I want
var source = Rx.Observable
.interval(100 /* ms */)
.bufferWithTime(1000).take(1)
I can print correct values just fine
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + JSON.stringify(x));
},
function () {
console.log('Completed');
});
This prints
[0,1,2,3,4,5,6,7,8]
Completed
But want I want is to assign this array to variable. Conceptually I want something like
var collectedDuringSecond = source.toPromise.getValue()
The idea is that getValue would block so after the line above is done collectedDuringSecond will contain [0,1,2,3,4,5,6,7,8]
concatMap operator is basically a combination of two operators - concat and map. The map part lets you map a value from a source observable to an observable stream. Those streams are often referred to as inner streams.
Subscriptions to observables are quite similar to calling a function. But where observables are different is in their ability to return multiple values called streams (a stream is a sequence of data over time). Observables not only able to return a value synchronously, but also asynchronously.
Observable is not only a monad, but a very powerful one - maybe even the most powerful monad used in the mainstream$.
RxJS' of() is a creational operator that allows you to create an RxJS Observable from a sequence of values. According to the official docs: of() converts the arguments to an observable sequence.
Synchronous event programming in JavaScript is highly restrictive. In fact, it may be impossible in a lot of cases. I tried hacking around with Rx to see if I could provide a synchronous interface without modifying the Rx source, and (for good reason) it's not possible with straight JavaScript.
I would suggest exposing the Observable as part of your API, and allowing consumers to handle it from there (with a nudge to use Rx, of course ;).
function MyClass () {
this.getArrayOfStuffAsObservable = function () {
return Rx.Observable.interval(100)
.bufferWithTime(1000).take(1);
};
// this is optional and I don't recommend it, since you already have Rx available.
// additionally, consumers will probably miss the fact that you can dispose
// of the subscription.
this.getArrayOfStuff = function (callback) {
var value;
return this.getArrayOfStuffAsObservable()
.subscribe(
function (x) {
value = x;
},
function (err) {
callback(err);
},
function () {
if (hasValue) {
callback(undefined, value);
} else {
callback('did not receive value');
}
});
};
};
As an additional note, you may want to use toArray
in conjunction with take
instead of bufferWithTime
for this specific example (it's really two ways of doing the same thing, but one is based on time and the other based on item count). toArray
creates an Observable which will collect all of the values of the underlying observable, and yield those values as an array when the underlying Observable completes.
this.getArrayOfStuffAsObservable = function () {
return Rx.Observable.interval(100)
.take(10)
.toArray();
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With