How to end chained requests in Rx Vert.X ?
HttpClient client = Vertx.vertx().createHttpClient();
HttpClientRequest request = client.request(HttpMethod.POST,
"someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData);
request.toObservable().
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> { //something
return httpClientResponse.toObservable();
}).
map(buffer -> {return buffer.toString()}).
//flatmap data -> Observable<HttpClientResponse>
flatMap(postData -> client.request(HttpMethod.POST,
someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()).
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> {
return httpClientResponse.toObservable();
})......//other operators
request.end();
Notice that I have .end()
for the top request. How do I end request that is inside of the .flatmap
? Do I even need to end it ?
There are multiple ways to ensure to call request.end()
. But I would dig into documentation of Vert.x or just open source code if there is one, to see if it does call end() for you. Otherwise one could be
final HttpClientRequest request = ...
request.toObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
request.end();
}
});
I think you can do something like the following code.
The main idea is that you don't directly use the HttpClientRequest
as obtained by the Vertx
client. Instead you create another flowable that will invoke end()
as soon as the first subscription is received.
Here, for instance, you can obtain the request through a pair custom methods: in this case request1()
and request2()
. They both use doOnSubscribe()
to trigger the end()
you need. Read its description on the ReactiveX page.
This examle uses vertx and reactivex, I hope you could use this set up.
import io.reactivex.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.core.http.HttpClientResponse;
import org.junit.Test;
public class StackOverflow {
@Test public void test(){
Buffer jsonData = Buffer.buffer("..."); // the json data.
HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client.
request1(client)
.flatMap(httpClientResponse -> httpClientResponse.toFlowable())
.map(buffer -> buffer.toString())
.flatMap(postData -> request2(client, postData) )
.forEach( httpResponse -> {
// do something with returned data);
});
}
private Flowable<HttpClientResponse> request1(HttpClient client) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
private Flowable<HttpClientResponse> request2(HttpClient client, String postData) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
// do something with postData
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With