Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit the request/second with WebClient?

I'm using a WebClient object to send Http Post request to a server. It's sending a huge amount of requests quite rapidly (there is about 4000 messages in a QueueChannel). The problem is... it seems the server can't respond fast enough... so I'm getting a lot of server error 500 and connexion closed prematurely.

Is there a way to limit the number of request per seconds ? Or limit the number of threads it's using ?

EDIT :

The Message endpoint processe message in a QueueChannel :

@MessageEndpoint
public class CustomServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    IHttpService httpService;

    @ServiceActivator(
            inputChannel = "outputFilterChannel",
            outputChannel = "outputHttpServiceChannel",
            poller = @Poller( fixedDelay = "1000" )
    )
    public void processMessage(Data data) {
        httpService.push(data);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

The WebClient service class :

@Service
public class HttpService implements IHttpService {

    private static final String URL = "http://www.blabla.com/log";

    private static final Logger logger = LogManager.getLogger();

    @Autowired
    WebClient webClient;

    @Override
    public void push(Data data) {
        String body = constructString(data);
        Mono<ResponseEntity<Response>> res = webClient.post()
                .uri(URL + getLogType(data))
                .contentLength(body.length())
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(body)
                .exchange()
                .flatMap(response -> response.toEntity(Response.class));

        res.subscribe(new Consumer<ResponseEntity<Response>>() { ... });
    }
}
like image 759
Phoste Avatar asked May 17 '18 09:05

Phoste


People also ask

How does spring boot handle 1000 requests per second?

To handle high traffic, you should setup Load Balancer with multiple node/instances. Better to go with Auto Scaling on Cloud server. It will increase the instances as per high load (number or request) and again decrease the instances when there will be low number of requests. Which is cost effective.

Is WebClient asynchronous?

On the other side, WebClient uses an asynchronous, non-blocking solution provided by the Spring Reactive framework.

What is the difference between exchange and retrieve in WebClient?

The retrieve() method decodes the ClientResponse object and hands you the ready-made object for your use. It doesn't have a very nice api for handling exceptions. However on the other hand the exchange() method hands you the ClientResponse object itself along with the response status and headers.


2 Answers

Resilience4j has excellent support for non-blocking rate limiting with Project Reactor.

Required dependencies (beside Spring WebFlux):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.6.1</version>
</dependency>

Example:

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class WebClientRateLimit
{
    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    private final WebClient webClient;
    private final RateLimiter rateLimiter;

    public WebClientRateLimit()
    {
        this.webClient = WebClient.create();

        // enables 3 requests every 5 seconds
        this.rateLimiter = RateLimiter.of("my-rate-limiter",
                RateLimiterConfig.custom()
                                 .limitRefreshPeriod(Duration.ofSeconds(5))
                                 .limitForPeriod(3)
                                 .timeoutDuration(Duration.ofMinutes(1)) // max wait time for a request, if reached then error
                                 .build());
    }

    public Mono<?> call()
    {
        return webClient.get()
                        .uri("https://jsonplaceholder.typicode.com/todos/1")
                        .retrieve()
                        .bodyToMono(String.class)
                        .doOnSubscribe(s -> System.out.println(COUNTER.incrementAndGet() + " - " + LocalDateTime.now()
                                + " - call triggered"))
                        .transformDeferred(RateLimiterOperator.of(rateLimiter));
    }

    public static void main(String[] args)
    {
        WebClientRateLimit webClientRateLimit = new WebClientRateLimit();

        long start = System.currentTimeMillis();

        Flux.range(1, 16)
            .flatMap(x -> webClientRateLimit.call())
            .blockLast();

        System.out.println("Elapsed time in seconds: " + (System.currentTimeMillis() - start) / 1000d);
    }
}

Example output:

1 - 2020-11-30T15:44:01.575003200 - call triggered
2 - 2020-11-30T15:44:01.821134 - call triggered
3 - 2020-11-30T15:44:01.823133100 - call triggered
4 - 2020-11-30T15:44:04.462353900 - call triggered
5 - 2020-11-30T15:44:04.462353900 - call triggered
6 - 2020-11-30T15:44:04.470399200 - call triggered
7 - 2020-11-30T15:44:09.461199100 - call triggered
8 - 2020-11-30T15:44:09.463157 - call triggered
9 - 2020-11-30T15:44:09.463157 - call triggered
11 - 2020-11-30T15:44:14.461447700 - call triggered
10 - 2020-11-30T15:44:14.461447700 - call triggered
12 - 2020-11-30T15:44:14.461447700 - call triggered
13 - 2020-11-30T15:44:19.462098200 - call triggered
14 - 2020-11-30T15:44:19.462098200 - call triggered
15 - 2020-11-30T15:44:19.468059700 - call triggered
16 - 2020-11-30T15:44:24.462615 - call triggered
Elapsed time in seconds: 25.096

Docs: https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-ratelimiter

like image 170
Martin Tarjányi Avatar answered Sep 30 '22 02:09

Martin Tarjányi


Question Limiting rate of requests with Reactor provides two answrers (one in comment)

zipWith another flux that acts as rate limiter

.zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))

just delay each web request

use delayElements function

edit: answer below is valid for blocking RestTemplate but do not really fit well into reactive pattern.

WebClient does not have ability to limit request, but you could easily add this feature using composition.

You may throttle your client externally using RateLimiter from Guava/ (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)

In this tutorial http://www.baeldung.com/guava-rate-limiter you will find how to use Rate limiter in blocking way, or with timeouts.

I would decorate all calls that need to be throttled in separate class that

  1. limits number of calls per second
  2. performs actual web call using WebClient
like image 42
Bartosz Bilicki Avatar answered Sep 30 '22 01:09

Bartosz Bilicki