I'm using a service that reads messages from Kafka
and pushes it into Cassandra
.
I'm using a threaded architecture for the same.
There are say, k threads
consuming from Kafka topic. These write into a queue, declared as:
public static BlockingQueue<>
Now there are a number of threads, say n
, which write into Cassandra. Here is the code that does that:
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
}
}
content
is the BlockingQueue used for read-write operations.
I'm extending the Thread
class in the implementation of threading and there are a fixed number of threads that continue execution unless interrupted.
The problem is, this is using too much of CPU. Here is the first line of top
command:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java
Here is the output of strace
on a thread of this process:
strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on
Why I am using Thread.yield()
, is because of this
If you want any other information for debugging, please let me know.
Now the question is, how can CPU utilization be minimized?
The whole purpose of a BlockingQueue is that it blocks when it is empty. So the consumer threads (the ones populating into Cassandra) don't have to manually check if they are empty. You can just make a call to take() and if the queue is empty, the call will block unless it is interrupted or if there is an element available.
When a thread is blocked, the scheduler can schedule some other thread in its place which saves you from calling yield() and so on. Remember that yield() will give way to another thread only if a thread with a priority greater than or equal to the thread that is yielding is available to run.
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
try {
JSONObject msg = content.take();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
From the looks of your code it seems that your consumer threads are always checking for content available. Therefore, your threads are always running and never iddle (waiting for someone to notify them), therefore your CPU is always doing something, even if it is always yielding the thread the current thread.
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
You are clearly tring to solve the producer-consumer issue that many of us faced somewhere over our programming careers.
What you're currently doing is having the consumer proactively continually checking if it has something to consume.
The least and easiest CPU intensive way of solving it is:
Check out this example as it contains a simplest way to do it. You may want to revisit Java Concurrency in Practice for more profound help.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With