I have a REST web service that runs on Jetty. I want to write a Java client that chunks along a huge batch of documents to that rest service using the same web connection.
I was able to establish an Iterator based streaming approach here:
Sending a stream of documents to a Jersey @POST endpoint
This does not work unless you set clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED);
because the Content-length
is unknown.
While somewhat working, the chunked transfer seems to lose a few documents. For example:
num_docs 500000
numFound 499249
Maybe it's sending chunks like:
{some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:do
So I'm losing some each time at the ends? UPDATE: I was wrong about this.
How do I make it not do that? Any ideas what else might be happening?
ClientConfig clientConfig = new ClientConfig();
clientConfig.property(ClientProperties.CONNECT_TIMEOUT, (int)TimeUnit.SECONDS.toMillis(60));
clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED);
clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 100);
clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, HttpClientFactory.createConnectionManager(name,
metricRegistry, configuration));
ApacheConnectorProvider connector = new ApacheConnectorProvider();
clientConfig.connectorProvider(connector);
clientConfig.register(new ClientRequestFilter() {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
List<Object> orig = requestContext.getHeaders().remove(HttpHeaders.CONTENT_LENGTH);
if (orig != null && !orig.isEmpty()) {
requestContext.getHeaders().addAll("Length", orig);
}
}
});
clientConfig.register(new ClientRequestFilter() {
@Override
public void filter(ClientRequestContext requestContext) throws IOException {
if (requestContext.getMediaType() != null &&
requestContext.getMediaType().getType() != null &&
requestContext.getMediaType().getType().equalsIgnoreCase("multipart")) {
final MediaType boundaryMediaType = Boundary.addBoundary(requestContext.getMediaType());
if (boundaryMediaType != requestContext.getMediaType()) {
requestContext.getHeaders().putSingle(HttpHeaders.CONTENT_TYPE, boundaryMediaType.toString());
}
if (!requestContext.getHeaders().containsKey("MIME-Version")) {
requestContext.getHeaders().putSingle("MIME-Version", "1.0");
}
}
}
});
closing this out - i was accidentally closing the stream early so it was really missing docs at the end which gave me the hint to wait until the blocking queue was empty before shutting down executors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With