Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to end chained http requests in RxJava Vert.x?

How to end chained requests in Rx Vert.X ?

 HttpClient client = Vertx.vertx().createHttpClient();
        HttpClientRequest request = client.request(HttpMethod.POST,
                "someURL")
                .putHeader("content-type", "application/x-www-form-urlencoded")
                .putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData);
        request.toObservable().
                //flatmap HttpClientResponse -> Observable<Buffer>
                        flatMap(httpClientResponse -> { //something
                    return httpClientResponse.toObservable();
                }).
                        map(buffer -> {return buffer.toString()}).
                //flatmap data -> Observable<HttpClientResponse>
                        flatMap(postData -> client.request(HttpMethod.POST,
                        someURL")
                        .putHeader("content-type", "application/x-www-form-urlencoded")
                        .putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()).
                //flatmap HttpClientResponse -> Observable<Buffer>
                        flatMap(httpClientResponse -> {
                    return httpClientResponse.toObservable();
                })......//other operators
request.end();

Notice that I have .end() for the top request. How do I end request that is inside of the .flatmap ? Do I even need to end it ?

like image 547
newprint Avatar asked Oct 18 '22 15:10

newprint


2 Answers

There are multiple ways to ensure to call request.end(). But I would dig into documentation of Vert.x or just open source code if there is one, to see if it does call end() for you. Otherwise one could be

final HttpClientRequest request = ...
request.toObservable()
       .doOnUnsubscribe(new Action0() {
           @Override
           public void call() {
               request.end();
           }
       });
like image 158
Volodymyr Lykhonis Avatar answered Nov 01 '22 10:11

Volodymyr Lykhonis


I think you can do something like the following code.

The main idea is that you don't directly use the HttpClientRequest as obtained by the Vertx client. Instead you create another flowable that will invoke end() as soon as the first subscription is received.

Here, for instance, you can obtain the request through a pair custom methods: in this case request1() and request2(). They both use doOnSubscribe() to trigger the end() you need. Read its description on the ReactiveX page.

This examle uses vertx and reactivex, I hope you could use this set up.

import io.reactivex.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.core.http.HttpClientResponse;
import org.junit.Test;

public class StackOverflow {

    @Test public void test(){

        Buffer jsonData = Buffer.buffer("..."); // the json data.

        HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client.

        request1(client)
            .flatMap(httpClientResponse -> httpClientResponse.toFlowable())
            .map(buffer -> buffer.toString())
            .flatMap(postData -> request2(client, postData) )
            .forEach( httpResponse -> {
                // do something with returned data);
            });

    }

    private Flowable<HttpClientResponse> request1(HttpClient client) {
        HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
        return request
                .toFlowable()
                .doOnSubscribe( subscription -> request.end() );
    }

    private Flowable<HttpClientResponse> request2(HttpClient client, String postData) {
        HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
        // do something with postData
        return request
                .toFlowable()
                .doOnSubscribe( subscription -> request.end() );
    }

}
like image 22
danidemi Avatar answered Nov 01 '22 10:11

danidemi