I am maintaining and developing two Akka Scala applications that interface with a Serial device to gather sensor information. The main difference between the two is that one (My CO2 sensor application) uses 1% CPU while the other (My Power sensor application) uses 250% CPU. This is both the case on a Linux machine (Raspberry Pi 3) as well as on my Windows Desktop PC. Code-wise the main difference is that CO2 uses the Serial library (http://fazecast.github.io/jSerialComm/) directly, while the Power sensor app goes through a layer of middleware to convert the In/OutputStreams of the Serial library to Akka Source/Sink as such:
val port = SerialPort.getCommPort(comPort)
port.setBaudRate(baudRate)
port.setFlowControl(flowControl)
port.setComPortParameters(baudRate, dataBits, stopBits, parity)
port.setComPortTimeouts(timeoutMode, timeout, timeout)
val isOpen = port.openPort()
if(!isOpen) {
error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting")
throw new Exception("Port could not be opened")
}
(reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream))
When I saw this high CPU usage I immediately slapped a Profiler (VisualVM) against it which told me the following:
After googling for Unsafe.park I found the following answer: https://stackoverflow.com/a/29414580/1122834 - Using this information I checked the amount of context switching WITH and WITHOUT my Power sensor app, and the results were very clear about the root cause of the issue:
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
10 0 32692 80144 71228 264356 0 0 0 5 7 8 38 5 55 2 0
1 0 32692 80176 71228 264356 0 0 0 76 12932 18856 59 6 35 0 0
1 0 32692 80208 71228 264356 0 0 0 0 14111 20570 60 8 32 0 0
1 0 32692 80208 71228 264356 0 0 0 0 13186 16095 65 6 29 0 0
1 0 32692 80176 71228 264356 0 0 0 0 14008 23449 56 6 38 0 0
3 0 32692 80208 71228 264356 0 0 0 0 13528 17783 65 6 29 0 0
1 0 32692 80208 71228 264356 0 0 0 28 12960 16588 63 6 31 0 0
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 32692 147320 71228 264332 0 0 0 5 7 8 38 5 55 2 0
0 0 32692 147296 71228 264332 0 0 0 84 963 1366 0 0 98 2 0
0 0 32692 147296 71228 264332 0 0 0 0 962 1347 1 0 99 0 0
0 0 32692 147296 71228 264332 0 0 0 0 947 1318 1 0 99 0 0
As you can see, the amount of context switches went down by ~12000 a second just by killing my application. I continued by checking which exact threads were doing this, and it seems Akka is really eager to do stuff:
Both a comment here and on another SO question point towards tweaking the parallelism settings of Akka. I added the following to my application.conf - to no result.
akka {
log-config-on-start = "on"
actor{
default-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
default-executor {
fallback = "fork-join-executor"
}
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 1
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
}
stream{
default-blocking-io-dispatcher {
type = PinnedDispatcher
executor = "fork-join-executor"
throughput = 1
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-factor = 1.0
core-pool-size-max = 1
}
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
}
}
}
This seems to improve the CPU usage (100% -> 65%) but still, the CPU usage is unnecessarily high.
UPDATE 21-11-'16 It would appear the problem is inside my graph. When not running the graph the CPU usage goes down immediately to normal levels. The graph is the following:
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val responsePacketSource = serialSource
.via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true))
.via(cleanPacket)
.via(printOutput("Received: ",debug(_)))
.via(byteStringToResponse)
val packetSink = pushSource
.via(throttle(throttle))
val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket])
val broadcastRequest = builder.add(Broadcast[RequestPacket](2))
val broadcastResponse = builder.add(Broadcast[ResponsePacket](2))
packetSink ~> broadcastRequest.in
broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink
broadcastRequest.out(1) ~> zipRequestStickResponse.in0
responsePacketSource ~> broadcastResponse.in
broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1
broadcastResponse.out(1).filter(!isStickAck(_)).map (al => {
val e = completeRequest(al)
debug(s"Sinking: $e")
e
}) ~> Sink.ignore
zipRequestStickResponse.out.map { case(request, stickResponse) =>
debug(s"Mapping: request=$request, stickResponse=$stickResponse")
pendingPackets += stickResponse.sequenceNumber -> request
request.stickResponse trySuccess stickResponse
} ~> Sink.ignore
ClosedShape
})
streamGraph.run()
When removing the filters from broadcastResponse, the CPU usage goes down to normal levels. This leads me to believe that the zip never happens, and therefore, the graph goes into an incorrect state.
The problem is that Fazecast's jSerialComm library has a number of different time-out modes.
static final public int TIMEOUT_NONBLOCKING = 0x00000000;
static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001;
static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010;
static final public int TIMEOUT_READ_BLOCKING = 0x00000100;
static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000;
static final public int TIMEOUT_SCANNER = 0x00010000;
Using the non blocking read()
method (TIMEOUT_NONBLOCKING
) results in a very high CPU usage when combined with the Akka Stream's InputStreamPublisher
. To prevent this simply use TIMEOUT_READ_SEMI_BLOCKING
or TIMEOUT_READ_BLOCKING
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With