How should I deal with servers that hang sending an HTTP response body using the HTTP client included in Java 11 onwards, when I need to handle the response in a streaming fashion?
Having read the documentation, I'm aware that it's possible to set a timeout on connection and a timeout on the request:
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(2))
.build();
HttpRequest httpRequest = HttpRequest.newBuilder(URI.create("http://example.com"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
responseLineStream.count();
In the above code:
BodyHandler
, a response is considered received when the status line and headers have been received.This means that when the code executes, within 7 seconds either an exception will have been thrown, or we'll have arrived at the last line. However, the last line isn't constrained by any timeout. If the server stops sending the response body, the last line blocks forever.
How can I prevent the last line hanging in this case?
To set an infinite timeout, set the property value to InfiniteTimeSpan. A Domain Name System (DNS) query may take up to 15 seconds to return or time out.
The HyperText Transfer Protocol (HTTP) 408 Request Timeout response status code means that the server would like to shut down this unused connection. It is sent on an idle connection by some servers, even without any previous request by the client.
A Request-Timeout header is defined for Hypertext Transfer Protocol (HTTP). This end-to-end header informs an origin server and any intermediaries of the maximum time that a client will await a response to its request. A server can use this header to ensure that a timely response is generated.
DefaultHttpClient httpClient = new DefaultHttpClient(); int timeout = 5; // seconds HttpParams httpParams = httpClient.
My guess this is left to the consumer of the stream, since this is part of the handling logic, so the body handling can be still be processed with a CompletableFuture
:
HttpResponse<Stream<String>> httpResponse = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofLines());
Stream<String> responseLineStream = httpResponse.body();
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> responseLineStream.count());
long count = future.get(3, TimeUnit.SECONDS);
Or just simply a Future
executed by a Java Executor
.
One way to solve this is to set a timeout on the time taken to receive the whole body. That's what M A's solution does. As you've noticed, you should close the stream if timeout evaluates, so the connection is released properly instead of hanging in background. A more general approach is to implement a BodySubscriber
that completes itself exceptionally when it is not completed by upstream within the timeout. This affords not having to spawn a thread just for timed waiting or close the stream. Here's an appropriate implementation.
class TimeoutBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> downstream;
private final Duration timeout;
private Subscription subscription;
/** Make sure downstream isn't called after we receive an onComplete or onError. */
private boolean done;
TimeoutBodySubscriber(BodySubscriber<T> downstream, Duration timeout) {
this.downstream = downstream;
this.timeout = timeout;
}
@Override
public CompletionStage<T> getBody() {
return downstream.getBody();
}
@Override
public synchronized void onSubscribe(Subscription subscription) {
this.subscription = requireNonNull(subscription);
downstream.onSubscribe(subscription);
// Schedule an error completion to be fired when timeout evaluates
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(this::onTimeout);
}
private synchronized void onTimeout() {
if (!done) {
done = true;
downstream.onError(new HttpTimeoutException("body completion timed out"));
// Cancel subscription to release the connection, so it doesn't keep hanging in background
subscription.cancel();
}
}
@Override
public synchronized void onNext(List<ByteBuffer> item) {
if (!done) {
downstream.onNext(item);
}
}
@Override
public synchronized void onError(Throwable throwable) {
if (!done) {
done = true;
downstream.onError(throwable);
}
}
@Override
public synchronized void onComplete() {
if (!done) {
done = true;
downstream.onComplete();
}
}
static <T> BodyHandler<T> withBodyTimeout(BodyHandler<T> handler, Duration timeout) {
return responseInfo -> new TimeoutBodySubscriber<>(handler.apply(responseInfo), timeout);
}
}
It can be used as follows:
Duration timeout = Duration.ofSeconds(10);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, TimeoutBodySubscriber.withTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
Another approach is to use a read timeout. This is more flexible as the response isn't timed-out as long as the server remains active (i.e. keeps sending stuff). You'll need a BodySubscriber
that completes itself exceptionally if it doesn't receive its next requested signal within the timeout. This is slightly more complex to implement. You can use Methanol if you're fine with a dependency. It implements read timeouts as described.
Duration timeout = Duration.ofSeconds(3);
HttpResponse<Stream<String>> httpResponse = httpClient
.send(httpRequest, MoreBodyHandlers.withReadTimeout(HttpResponse.BodyHandlers.ofLines(), timeout));
Another strategy is to use a combination of both: time out as soon as the server becomes inactive or the body takes too long to complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With