I'm using a WebClient
object to send Http Post request to a server.
It's sending a huge amount of requests quite rapidly (there is about 4000 messages in a QueueChannel
). The problem is... it seems the server can't respond fast enough... so I'm getting a lot of server error 500 and connexion closed prematurely.
Is there a way to limit the number of request per seconds ? Or limit the number of threads it's using ?
EDIT :
The Message endpoint processe message in a QueueChannel :
@MessageEndpoint
public class CustomServiceActivator {
private static final Logger logger = LogManager.getLogger();
@Autowired
IHttpService httpService;
@ServiceActivator(
inputChannel = "outputFilterChannel",
outputChannel = "outputHttpServiceChannel",
poller = @Poller( fixedDelay = "1000" )
)
public void processMessage(Data data) {
httpService.push(data);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
The WebClient service class :
@Service
public class HttpService implements IHttpService {
private static final String URL = "http://www.blabla.com/log";
private static final Logger logger = LogManager.getLogger();
@Autowired
WebClient webClient;
@Override
public void push(Data data) {
String body = constructString(data);
Mono<ResponseEntity<Response>> res = webClient.post()
.uri(URL + getLogType(data))
.contentLength(body.length())
.contentType(MediaType.APPLICATION_JSON)
.syncBody(body)
.exchange()
.flatMap(response -> response.toEntity(Response.class));
res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
}
}
To handle high traffic, you should setup Load Balancer with multiple node/instances. Better to go with Auto Scaling on Cloud server. It will increase the instances as per high load (number or request) and again decrease the instances when there will be low number of requests. Which is cost effective.
On the other side, WebClient uses an asynchronous, non-blocking solution provided by the Spring Reactive framework.
The retrieve() method decodes the ClientResponse object and hands you the ready-made object for your use. It doesn't have a very nice api for handling exceptions. However on the other hand the exchange() method hands you the ClientResponse object itself along with the response status and headers.
Resilience4j has excellent support for non-blocking rate limiting with Project Reactor.
Required dependencies (beside Spring WebFlux):
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.6.1</version>
</dependency>
Example:
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;
public class WebClientRateLimit
{
private static final AtomicInteger COUNTER = new AtomicInteger(0);
private final WebClient webClient;
private final RateLimiter rateLimiter;
public WebClientRateLimit()
{
this.webClient = WebClient.create();
// enables 3 requests every 5 seconds
this.rateLimiter = RateLimiter.of("my-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(5))
.limitForPeriod(3)
.timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
.build());
}
public Mono<?> call()
{
return webClient.get()
.uri("https://jsonplaceholder.typicode.com/todos/1")
.retrieve()
.bodyToMono(String.class)
.doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
+ " - call triggered"))
.transformDeferred(RateLimiterOperator.of(rateLimiter));
}
public static void main(String[] args)
{
WebClientRateLimit webClientRateLimit = new WebClientRateLimit();
long start = System.currentTimeMillis();
Flux.range(1, 16)
.flatMap(x -> webClientRateLimit.call())
.blockLast();
System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
}
}
Example output:
1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096
Docs: https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter
Question Limiting rate of requests with Reactor provides two answrers (one in comment)
zipWith another flux that acts as rate limiter
.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))
just delay each web request
use delayElements function
edit: answer below is valid for blocking RestTemplate but do not really fit well into reactive pattern.
WebClient does not have ability to limit request, but you could easily add this feature using composition.
You may throttle your client externally using RateLimiter from Guava/ (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
In this tutorial http://www.baeldung.com/guava-rate-limiter you will find how to use Rate limiter in blocking way, or with timeouts.
I would decorate all calls that need to be throttled in separate class that
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With