I am using datastax java driver 3.1.0 to connect to cassandra cluster and my cassandra cluster version is 2.0.10. I am writing asynchronously with QUORUM
consistency.
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Semaphore concurrentQueries = new Semaphore(1000);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
concurrentQueries.acquire();
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
concurrentQueries.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
concurrentQueries.release();
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
My above save method will be called from multiple threads at very fast speed. If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.
Question:
I was thinking to use some sort off queue or buffer to enqueue requests (e.g. java.util.concurrent.ArrayBlockingQueue
). "Buffer full" would mean that clients should wait. Buffer would also be used to re-enqueue failed requests. However to be more fair failed requests probably should be put to front of queue so they are retried first. Also we should somehow handle situation when queue is full and there are new failed requests at the same time. A single-threaded worker then would pick requests from queue and send them to Cassandra. Since it should not do much it's unlikely that it becomes a bottle-neck. This worker can apply it's own rate limits, e.g. based on timing with com.google.common.util.concurrent.RateLimiter
.
What is the best way to implement this queue or buffer feature which can apply particular guava rate limiting as well while writing into Cassandra or if there is any better approach let me know as well? I wanted to write to Cassandra at 2000 request per second (this should be configurable so that I can play with it to see what is optimal setting).
As noted below in the comments, if memory keeps increasing we can use Guava Cache or CLHM to keep dropping old records to make sure my program doesn't run out of memory. We will be having around 12GB of memory on the box and these records are very small so I don't see it should be a problem.
The rate of inbound requests is not limited, but when the accumulated amount of inbound requests exceeds the maximum capacity of the bucket, new inbound requests are denied. The token bucket limits the average inflow rate and allows sudden increase in traffic. The request can be processed as long as it has a token.
In this example, the rate limiting module is multi-tenant, and you can provision multiple rate limit instance IDs and define specific policies against each of them. The granularity of the clock is in seconds, and the smallest granularity of time in the implementation is 1 second.
RateLimiter is safe for concurrent use: It will restrict the total rate of calls from all threads. Note, however, that it does not guarantee fairness. Rate limiters are often used to restrict the rate at which some physical or logical resource is accessed.
Resilience4j provides a RateLimiter which splits all nanoseconds from the start of epoch into cycles. Each cycle has a duration configured by RateLimiterConfig. limitRefreshPeriod .
If I write at very high speed than my Cassandra cluster can handle then it will start throwing errors and I want all my writes should go successfully into cassandra without any loss.
Datastax driver allows to configure number of connections per host and number of concurrent requests per connection (see PoolingOptions settings)
Adjust these settings to decrease pressure on Cassandra cluster.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With