Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronous stream of responses from a stream of requests with RxJS

I'm new to RxJS and was wondering if anyone could help me.

I want to create a synchronous stream of responses (preferably with the corresponding requests) from a stream of requests(payload data).

I basically want the requests to be sent one by one, each waiting for the response from the last one.

I tried this, but it sends everything at once ( jsbin ):

var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};

The following works, to an extent, but does not use stream for the request data ( jsbin ).

var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};

Thank you!

EDIT:

Just to clarify, this is what I wanted to achieve:

"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."

Using concatMap and defer, as suggested by user3743222, seems to do it ( jsbin ):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);
like image 334
jamesref Avatar asked Oct 18 '22 16:10

jamesref


1 Answers

Try replacing flatMap with concatMap in your first code sample and let me know if the resulting behaviour corresponds to what you are looking for.

responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

Basically concatMap has a similar signature than flatMap, the difference in behaviour being that it will wait for the current observable being flattened to complete before proceeding with the next one. So here:

  • a requestStream value will be pushed to the concatMap operator.
  • the concatMap operator will generate a sendRequest observable, and whatever values out of that observable (seems to be a tuple (val, response)) will be passed through the selector function and the object result of that will be passed downstream
  • when that sendRequest completes, another requestStream value is processed.
  • In short, your requests will be processed one by one

Alternatively, maybe you want to use defer to defer the execution of the sendRequest.

responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);
like image 114
user3743222 Avatar answered Oct 27 '22 11:10

user3743222