Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send request to cassandra at a particular rate using Guava RateLimiter?

I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM consistency.

  private final ExecutorService executorService = Executors.newFixedThreadPool(10);
  private final Semaphore concurrentQueries = new Semaphore(1000);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      concurrentQueries.acquire();
      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          concurrentQueries.release();
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          concurrentQueries.release();
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

My above save method will be called from multiple threads at very fast speed. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.

Question:

I was thinking to use some sort off queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue). "Buffer full" would mean that clients should wait. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also we should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests from queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter.

What is the best way to implement this queue or buffer feature which can apply particular guava rate limiting as well while writing into Cassandra or if there is any better approach let me know as well? I wanted to write to Cassandra at 2000 request per second (this should be configurable so that I can play with it to see what is optimal setting).

As noted below in the comments, if memory keeps increasing we can use Guava Cache or CLHM to keep dropping old records to make sure my program doesn't run out of memory. We will be having around 12GB of memory on the box and these records are very small so I don't see it should be a problem.

like image 955
john Avatar asked Jan 20 '17 07:01

john


People also ask

How does a guava rate limiter work?

The rate of inbound requests is not limited, but when the accumulated amount of inbound requests exceeds the maximum capacity of the bucket, new inbound requests are denied. The token bucket limits the average inflow rate and allows sudden increase in traffic. The request can be processed as long as it has a token.

How do you use rate limiter in Java?

In this example, the rate limiting module is multi-tenant, and you can provision multiple rate limit instance IDs and define specific policies against each of them. The granularity of the clock is in seconds, and the smallest granularity of time in the implementation is 1 second.

Is Guava RateLimiter thread safe?

RateLimiter is safe for concurrent use: It will restrict the total rate of calls from all threads. Note, however, that it does not guarantee fairness. Rate limiters are often used to restrict the rate at which some physical or logical resource is accessed.

What is Resilience4j RateLimiter?

Resilience4j provides a RateLimiter which splits all nanoseconds from the start of epoch into cycles. Each cycle has a duration configured by RateLimiterConfig. limitRefreshPeriod .


1 Answers

If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.

Datastax driver allows to configure number of connections per host and number of concurrent requests per connection (see PoolingOptions settings)

Adjust these settings to decrease pressure on Cassandra cluster.

like image 50
Mikhail Baksheev Avatar answered Oct 05 '22 07:10

Mikhail Baksheev