Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

High CPU usage through context switches in Akka application

I am maintaining and developing two Akka Scala applications that interface with a Serial device to gather sensor information. The main difference between the two is that one (My CO2 sensor application) uses 1% CPU while the other (My Power sensor application) uses 250% CPU. This is both the case on a Linux machine (Raspberry Pi 3) as well as on my Windows Desktop PC. Code-wise the main difference is that CO2 uses the Serial library (http://fazecast.github.io/jSerialComm/) directly, while the Power sensor app goes through a layer of middleware to convert the In/OutputStreams of the Serial library to Akka Source/Sink as such:

  val port = SerialPort.getCommPort(comPort)

  port.setBaudRate(baudRate)
  port.setFlowControl(flowControl)
  port.setComPortParameters(baudRate, dataBits, stopBits, parity)
  port.setComPortTimeouts(timeoutMode, timeout, timeout)

  val isOpen = port.openPort()

  if(!isOpen) {
    error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting")

    throw new Exception("Port could not be opened")
  }

  (reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream))

When I saw this high CPU usage I immediately slapped a Profiler (VisualVM) against it which told me the following: Profiler screenshot

After googling for Unsafe.park I found the following answer: https://stackoverflow.com/a/29414580/1122834 - Using this information I checked the amount of context switching WITH and WITHOUT my Power sensor app, and the results were very clear about the root cause of the issue:

pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
10  0  32692  80144  71228 264356    0    0     0     5    7    8 38  5 55  2  0
 1  0  32692  80176  71228 264356    0    0     0    76 12932 18856 59  6 35  0  0
 1  0  32692  80208  71228 264356    0    0     0     0 14111 20570 60  8 32  0  0
 1  0  32692  80208  71228 264356    0    0     0     0 13186 16095 65  6 29  0  0
 1  0  32692  80176  71228 264356    0    0     0     0 14008 23449 56  6 38  0  0
 3  0  32692  80208  71228 264356    0    0     0     0 13528 17783 65  6 29  0  0
 1  0  32692  80208  71228 264356    0    0     0    28 12960 16588 63  6 31  0  0

pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0  32692 147320  71228 264332    0    0     0     5    7    8 38  5 55  2  0
 0  0  32692 147296  71228 264332    0    0     0    84  963 1366  0  0 98  2  0
 0  0  32692 147296  71228 264332    0    0     0     0  962 1347  1  0 99  0  0
 0  0  32692 147296  71228 264332    0    0     0     0  947 1318  1  0 99  0  0

As you can see, the amount of context switches went down by ~12000 a second just by killing my application. I continued by checking which exact threads were doing this, and it seems Akka is really eager to do stuff: Profiler threads

Both a comment here and on another SO question point towards tweaking the parallelism settings of Akka. I added the following to my application.conf - to no result.

akka {
  log-config-on-start = "on"
  actor{
    default-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      default-executor {
        fallback = "fork-join-executor"
      }
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 1
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 1.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 1
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 1
    }
  }
  stream{
    default-blocking-io-dispatcher {
      type = PinnedDispatcher
      executor = "fork-join-executor"
      throughput = 1

      thread-pool-executor {
        core-pool-size-min = 1
        core-pool-size-factor = 1.0
        core-pool-size-max = 1
      }
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
  }
}

This seems to improve the CPU usage (100% -> 65%) but still, the CPU usage is unnecessarily high.

UPDATE 21-11-'16 It would appear the problem is inside my graph. When not running the graph the CPU usage goes down immediately to normal levels. The graph is the following:

val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val responsePacketSource = serialSource
    .via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true))
    .via(cleanPacket)
    .via(printOutput("Received: ",debug(_)))
    .via(byteStringToResponse)

  val packetSink = pushSource
    .via(throttle(throttle))

  val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket])
  val broadcastRequest = builder.add(Broadcast[RequestPacket](2))
  val broadcastResponse = builder.add(Broadcast[ResponsePacket](2))

  packetSink ~> broadcastRequest.in
  broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink
  broadcastRequest.out(1) ~> zipRequestStickResponse.in0

  responsePacketSource ~> broadcastResponse.in
  broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1
  broadcastResponse.out(1).filter(!isStickAck(_)).map (al => {
    val e = completeRequest(al)
    debug(s"Sinking:          $e")
    e
  }) ~> Sink.ignore

  zipRequestStickResponse.out.map { case(request, stickResponse) =>
    debug(s"Mapping: request=$request, stickResponse=$stickResponse")
    pendingPackets += stickResponse.sequenceNumber -> request
    request.stickResponse trySuccess stickResponse
  } ~> Sink.ignore

  ClosedShape
})

streamGraph.run()

When removing the filters from broadcastResponse, the CPU usage goes down to normal levels. This leads me to believe that the zip never happens, and therefore, the graph goes into an incorrect state.

like image 797
Lolmewn Avatar asked Oct 17 '22 22:10

Lolmewn


1 Answers

The problem is that Fazecast's jSerialComm library has a number of different time-out modes.

static final public int TIMEOUT_NONBLOCKING = 0x00000000;
static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001;
static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010;
static final public int TIMEOUT_READ_BLOCKING = 0x00000100;
static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000;
static final public int TIMEOUT_SCANNER = 0x00010000;

Using the non blocking read() method (TIMEOUT_NONBLOCKING) results in a very high CPU usage when combined with the Akka Stream's InputStreamPublisher. To prevent this simply use TIMEOUT_READ_SEMI_BLOCKING or TIMEOUT_READ_BLOCKING.

like image 190
Brian Avatar answered Oct 31 '22 14:10

Brian