I'm new to RxJS and was wondering if anyone could help me.
I want to create a synchronous stream of responses (preferably with the corresponding requests) from a stream of requests(payload data).
I basically want the requests to be sent one by one, each waiting for the response from the last one.
I tried this, but it sends everything at once ( jsbin ):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
The following works, to an extent, but does not use stream for the request data ( jsbin ).
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
Thank you!
EDIT:
Just to clarify, this is what I wanted to achieve:
"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."
Using concatMap and defer, as suggested by user3743222, seems to do it ( jsbin ):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
Try replacing flatMap
with concatMap
in your first code sample and let me know if the resulting behaviour corresponds to what you are looking for.
responseStream = requestStream.concatMap(//I replaced `flatMap`
sendRequest,
(val, response)=>{ return {val, response}; }
);
Basically concatMap
has a similar signature than flatMap
, the difference in behaviour being that it will wait for the current observable being flattened to complete before proceeding with the next one. So here:
requestStream
value will be pushed to the concatMap
operator.concatMap
operator will generate a sendRequest
observable, and whatever values out of that observable (seems to be a tuple (val, response)
) will be passed through the selector function and the object result of that will be passed downstreamsendRequest
completes, another requestStream
value is processed.Alternatively, maybe you want to use defer
to defer the execution of the sendRequest
.
responseStream = requestStream.concatMap(//I replaced `flatMap`
function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
(val, response)=>{ return {val, response}; }
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With