Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement HTTP sink correctly?

I want to send calculation results of my DataStream flow to other service over HTTP protocol. I see two possible ways how to implement it:

  1. Use synchronous Apache HttpClient client in sink
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
            int httpStatusCode = response.getStatusLine().getStatusCode();

            httpStatusesAccumulator.add(httpStatusCode);
        }
    }
}
  1. Use asynchronous Apache HttpAsyncClient client in sink
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpAsyncClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
                .build();
        httpClient.start();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                int httpStatusCode = response.getStatusLine().getStatusCode();

                httpStatusesAccumulator.add(httpStatusCode);
            }

            @Override
            public void failed(Exception ex) {
                httpStatusesAccumulator.add(-1); // -1 - failed
            }

            @Override
            public void cancelled() {
                httpStatusesAccumulator.add(-2); // -2 - cancelled
            }
        });
    }
}

Questions:

  1. Should I use sync or async HTTP client in sink?

  2. In case if I will use sync client it will block sink and through back pressure Flink will block source. Right?

  3. In case if I will use async client it won't block sink. Right?

  4. Accumulators is not thread safe? i.e. can I use it in async callback?

  5. RuntimeContext is not thread safe? i.e. can I use it in async callback?

like image 640
Maxim Avatar asked Mar 25 '16 11:03

Maxim


1 Answers

1. Should I use sync or async HTTP client in sink?

In order to avoid backpressure due to blocking HTTP calls, I would recommend using the asynchronous HTTP client.

2. In case if I will use sync client it will block sink and through back pressure Flink will block source. Right?

Yes that is right. The backpressure will be propagated through your topology to the sources.

3. In case if I will use async client it won't block sink. Right?

This is correct.

4. Accumulators is not thread safe? i.e. can I use it in async callback?

Accumulators are not thread safe and, thus, access to them has to be synchronized.

5. RuntimeContext is not thread safe? i.e. can I use it in async callback?

The RuntimeContext is not thread safe and, thus, access to it has to be synchronized.

like image 67
Till Rohrmann Avatar answered Oct 09 '22 09:10

Till Rohrmann