I am using spring-boot version 2.0.0.M6. I need to make async HTTP calls from spring-boot app say APP1 to another app (play framework) say APP2. So if I need to make 20 distinct async calls from APP1 to APP2, APP2 receives 20 requests out of which few are duplicates, which means these duplicates replaced few distinct requests. Expected:
api/v1/call/1
api/v1/call/2
api/v1/call/3
api/v1/call/4
Actual:
api/v1/call/1
api/v1/call/2
api/v1/call/4
api/v1/call/4
I am using spring reactive WebClient.
Below is the spring boot version in build.gradle
buildscript {
ext {
springBootVersion = '2.0.0.M6'
//springBootVersion = '2.0.0.BUILD-SNAPSHOT'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
maven {url "https://plugins.gradle.org/m2/"}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
classpath("se.transmode.gradle:gradle-docker:1.2")
}
}
My WebClient init snippet
private WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector((HttpClientOptions.Builder builder) -> builder.disablePool()))
.build();
My POST method
public <T> Mono<JsonNode> postClient(String url, T postData) {
return Mono.subscriberContext().flatMap(ctx -> {
String cookieString = ctx.getOrDefault(Constants.SubscriberContextConstnats.COOKIES, StringUtils.EMPTY);
URI uri = URI.create(url);
return webClient.post().uri(uri).body(BodyInserters.fromObject(postData)).header(HttpHeaders.COOKIE, cookieString)
.exchange().flatMap(clientResponse ->
{
return clientResponse.bodyToMono(JsonNode.class);
})
.onErrorMap(err -> new TurtleException(err.getMessage(), err))
.doOnSuccess(jsonData -> {
});
});
}
The code from where this postClient method is invoked
private void getResultByKey(PremiumRequestHandler request, String key, BrokerConfig brokerConfig) {
/* Live calls for the insurers */
LOG.info("[PREMIUM SERVICE] LIVE CALLLLL MADE FOR: " + key + " AND REQUEST ID: " + request.getRequestId());
String uri = brokerConfig.getHostUrl() + verticalResolver.determineResultUrl(request.getVertical()) + key;
LOG.info("[PREMIUM SERVICE] LIVE CALL WITH URI : " + uri + " FOR REQUEST ID: " + request.getRequestId());
Mono<PremiumResponse> premiumResponse = reactiveWebClient.postClient(uri, request.getPremiumRequest())
.map(json -> PlatformUtils.mapToClass(json, PremiumResponse.class));
premiumResponse.subscribe(resp -> {
resp.getPremiumResults().forEach(result -> {
LOG.info("Key " + result.getKey());
repository.getResultRepoRawType(request.getVertical())
.save(result).subscribe();
saveResult.subscriberContext(ctx -> {
MultiBrokerMongoDBFactory.setDatabaseNameForCurrentThread(brokerConfig.getBroker());
return ctx;
}).subscribe();
});
}, error -> {
LOG.info("[PREMIUM SERVICE] ERROR RECIEVED FOR " + key + " AND REQUEST ID" + request.getRequestId() + " > " + error.getMessage());
});
}
Had put logs at the end-point in the client code, can not see multiple requests at that point.
Probably it's a bug in WebClient where URI is getting swapped in multithreaded environment.
Tried mutating WebClient, still the URI is getting swapped
Please help.
Git Repo added github.com/praveenk007/ps-demo
I happend to have experienced similar problem:
When calling the same service (here marked as ExternalService) in parallel (webflux) sometimes the same request was being sent and the problem indeed seems to reside in Webclient.
The solution was in changing the way Webclient is being created.
Before:
Here ExternalCall is configured client which calls ExternalService. So the parallel execution here was of internalCall method. Note we pass WebClient.RequestBodySpec class to ExternalCall class
@Bean
ExternalCall externalCall(WebClient.Builder webClientBuilder) {
var exchangeStrategies = getExchangeStrategies();
var endpoint = "v1/data";
var timeout = 10000;
var uri = "https://externalService.com/";
var requestBodySpec = webClientBuilder.clone()
.clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
.exchangeStrategies(exchangeStrategies)
.build()
.post()
.uri(endpoint)
.accept(TEXT_XML)
.contentType(TEXT_XML);
return new ExternalCall(requestBodySpec, uri, timeout);
}
Then within ExternalCall class I had had
private Mono<String> internalCall(String rq) {
return requestBodySpec.bodyValue(rq)
.retrieve()
.bodyToMono(String.class)
.timeout(timeout, Mono.error(() -> new TimeoutException(String.format("%s - timeout after %s seconds", "ExternalService", timeout.getSeconds()))));
}
After:
We pass WebClient class to ExternalCall.
@Bean
ExternalCall externalCall(WebClient.Builder webClientBuilder) {
var exchangeStrategies = getExchangeStrategies();
var timeout = 10000;
var uri = "https://externalService.com/";
var webClient = webClientBuilder.clone()
.clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
.exchangeStrategies(exchangeStrategies)
.build();
return new ExternalCall(webClient, uri, timeout);
}
Now we specify RequestBodySpec within ExternalCall class:
private Mono<String> internalCall(String rq) {
return webClient
.post()
.uri(endpoint)
.accept(TEXT_XML)
.contentType(TEXT_XML)
.bodyValue(rq)
.retrieve()
.bodyToMono(String.class)
.timeout(timeout, Mono.error(() -> new TimeoutException(String.format("%s - timeout after %s seconds", "ExternalService", timeout.getSeconds()))));
}
Conclusions: So apparently the moment you create WebClient.RequestBodySpec instance matters. Hope that helps somebody
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With