Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does akka-stream's Source.groupedWithin not respect the duration?

With akka-streams 2.4.17 Scala API, I'm trying to use Source.groupedWithin(size, duration) and specifying a duration. From the documentation and what I've seen in the source code, the grouping should proceed downward if either the group size or timeout is exceeded; whichever comes first.

When I run a simple workflow in fuzed mode (non-asynchronous), the duration doesn't appear to have any effect. However, when I put .async before or after the groupedWithin call, the timeout works.

Not Working Version

Source.fromIterator(() => aFiniteIterator)
  .map(aLongOperation(_))
  .groupedWithin(1000, 5.seconds) // keeps waiting beyond 5 seconds
  .map(somethingWithGroup(_))
  .runWith(Sink.fold(0)(_ + _))

Working Version

Source.fromIterator(() => aFiniteIterator)
  .map(aLongOperation(_))
  .async
  .groupedWithin(1000, 5.seconds) // now respects 5 seconds without full batch
  .map(somethingWithGroup(_))
  .runWith(Sink.fold(0)(_ + _))

Why is this? Is it possible the non-async version doesn't recognize downstream demand? Or is something else at play?

Update - Full Code Example with Output

For those who want to see the gory details, here is the full code I'm running. The context is that I'm experimenting with throttling throughout to avoid OOM exceptions.

case class Foo(id: String, value: String)

object Main {
  implicit val system = ActorSystem("akka-streams-oom")
  implicit val materializer = ActorMaterializer()

  def main(args: Array[String]): Unit = {
    println("starting tests...")
    val attempt = Try(forceOOM)

    attempt match {
      case Success(_) => println("all tests passed successfully")
      case Failure(e) => println(s"exception: e.getMessage")
    }

    println("terminating system...")
    system.terminate
    println("system terminated")
    println("done with tests...")
  }

  private def forceOOM: Unit = {
    println("executing forceOOM...")
    val sink = Sink.fold[Int, Int](0)(_ + _)

    val future =
      bigSource
        .map(logEmit)
        .via(slowSubscriber)
        .runWith(sink)

    val finalResult = Await.result(future, Duration.Inf)
    println(s"forceOOM result: $finalResult")
  }

  private def bigSource = {
    val largeIterator = () =>
      Iterator
        .from(0,1000000000)
        .map(_ => generateLargeFoo)

    Source.fromIterator(largeIterator)
  }

  private def slowSubscriber =
    Flow[Foo]
      .map { foo =>
        println(s"allocating memory for ${foo.id} at ${time}")
        Foo(foo.id, bloat)
      }
      .async  // if i remove this, the 5 second window below doesn't seem to work
      .groupedWithin(100, 5.seconds)
      .map(foldFoos)

  private def logEmit(x: Foo): Foo = {
    println(s"emitting next record: ${x.id} at ${time}")
    x
  }

  private def foldFoos(x: Seq[Foo]): Int = {
    println(s"folding records                                            at ${time}")
    x.map(_.value.length).fold(0)(_ + _)
  }

  private def time: String = LocalDateTime.now.toLocalTime.toString

  private def bloat: String = {
    (0 to 10)
      .map(_ => generateLargeFoo.value)
      .fold("")(_ + _)
  }

  private def generateLargeFoo: Foo = {
    Foo(java.util.UUID.randomUUID.toString, (0 to 1000000).mkString)
  }
}

Output Without Async (Extends Beyond Timeout)

[info] emitting next record: 5016fea4-f076-45dd-b95b-1d24f71a25b4 at 09:34:25.826
[info] allocating memory for 5016fea4-f076-45dd-b95b-1d24f71a25b4 at 09:34:25.868
[info] emitting next record: ab6e298b-0152-4af5-b685-bb4ed6c5b9de at 09:34:27.572
[info] allocating memory for ab6e298b-0152-4af5-b685-bb4ed6c5b9de at 09:34:27.572
[info] emitting next record: 6f5c1b75-5aaf-44e6-ac62-a6074735c057 at 09:34:28.957
[info] allocating memory for 6f5c1b75-5aaf-44e6-ac62-a6074735c057 at 09:34:28.958
[info] emitting next record: 313ce2b5-f669-4c59-b2ec-eafdae85ded6 at 09:34:30.378
[info] allocating memory for 313ce2b5-f669-4c59-b2ec-eafdae85ded6 at 09:34:30.378
[info] emitting next record: 91a8a95b-b3cc-4e27-8d3f-3400fa9c7a9f at 09:34:31.802
[info] allocating memory for 91a8a95b-b3cc-4e27-8d3f-3400fa9c7a9f at 09:34:31.802
[info] emitting next record: 0220e75a-029b-4d35-8494-690bed6938aa at 09:34:33.173
[info] allocating memory for 0220e75a-029b-4d35-8494-690bed6938aa at 09:34:33.174
[info] emitting next record: faa16b80-cfb1-4ea4-b3ba-c1d270caf865 at 09:34:34.409
[info] allocating memory for faa16b80-cfb1-4ea4-b3ba-c1d270caf865 at 09:34:34.409
[info] emitting next record: 8956d710-ad55-4dee-b4f3-82b8cf313a85 at 09:34:35.656
[info] allocating memory for 8956d710-ad55-4dee-b4f3-82b8cf313a85 at 09:34:35.656
[info] emitting next record: 1b989c56-6580-44f0-b8d9-46d5241046cc at 09:34:36.944
[info] allocating memory for 1b989c56-6580-44f0-b8d9-46d5241046cc at 09:34:36.945
[info] emitting next record: 66a766c7-29e0-40ca-b997-54985aad75d6 at 09:34:38.272
[info] allocating memory for 66a766c7-29e0-40ca-b997-54985aad75d6 at 09:34:38.272
[info] emitting next record: b8d29dad-bd44-4843-936e-5eb5df3bb594 at 09:34:39.530
[info] allocating memory for b8d29dad-bd44-4843-936e-5eb5df3bb594 at 09:34:39.530
[info] emitting next record: 8c7999cf-7796-427e-a155-c28d7fc4a934 at 09:34:40.987
[info] allocating memory for 8c7999cf-7796-427e-a155-c28d7fc4a934 at 09:34:40.988
[info] emitting next record: eda79635-4559-4c92-a5b7-83bbfc2e85b2 at 09:34:42.382
[info] allocating memory for eda79635-4559-4c92-a5b7-83bbfc2e85b2 at 09:34:42.382
[info] emitting next record: 8fa5d744-70e8-4261-9c3f-427737233e13 at 09:34:43.593
[info] allocating memory for 8fa5d744-70e8-4261-9c3f-427737233e13 at 09:34:43.593
[info] emitting next record: cc621484-c70d-4092-8dc6-2e39acc1f0b3 at 09:34:44.983
[info] allocating memory for cc621484-c70d-4092-8dc6-2e39acc1f0b3 at 09:34:44.983
[info] emitting next record: fbc03c9c-1ea8-4d4d-9a80-13118324140d at 09:34:46.244
[info] allocating memory for fbc03c9c-1ea8-4d4d-9a80-13118324140d at 09:34:46.244
[info] emitting next record: 96374d33-e117-4f48-b3be-79b8cb1e0fda at 09:34:47.953
[info] allocating memory for 96374d33-e117-4f48-b3be-79b8cb1e0fda at 09:34:47.953
[info] emitting next record: 1c210d73-35d3-41b9-ade6-9310783589a3 at 09:34:49.303
[info] allocating memory for 1c210d73-35d3-41b9-ade6-9310783589a3 at 09:34:49.303
[info] emitting next record: 3872c382-17a9-484a-861c-6f66a0c7d0ca at 09:34:50.620
[info] allocating memory for 3872c382-17a9-484a-861c-6f66a0c7d0ca at 09:34:50.620
[info] emitting next record: c34ba954-a9ff-45d1-910c-316c6eb9c85d at 09:34:52.597
[info] allocating memory for c34ba954-a9ff-45d1-910c-316c6eb9c85d at 09:34:52.597
[info] emitting next record: 8e5f804e-5e75-4eac-937f-651d45e3745d at 09:34:54.145
[info] allocating memory for 8e5f804e-5e75-4eac-937f-651d45e3745d at 09:34:54.145
[info] emitting next record: 1caf82cc-7b41-4730-bcc1-ca61ee7780e0 at 09:34:56.454
[info] allocating memory for 1caf82cc-7b41-4730-bcc1-ca61ee7780e0 at 09:34:56.455
[info] emitting next record: 9364d386-408a-4b63-80b5-0ed34473ba45 at 09:34:58.706
[info] allocating memory for 9364d386-408a-4b63-80b5-0ed34473ba45 at 09:34:58.706
[info] emitting next record: c43baaba-961e-4877-9835-7eeee538f0af at 09:35:00.822
[info] allocating memory for c43baaba-961e-4877-9835-7eeee538f0af at 09:35:00.822
[info] #
[info] # java.lang.OutOfMemoryError: Java heap space
[info] # -XX:OnOutOfMemoryError="kill -9 %p"
[info] #   Executing "kill -9 96871"...
java.lang.RuntimeException: Nonzero exit code returned from runner: 137
    at scala.sys.package$.error(package.scala:27)

Output With Async (Timeout Works)

[info] emitting next record: 668d6f9f-43cc-45a6-99b3-d8e8ab2b9cae at 09:28:48.188
[info] allocating memory for 668d6f9f-43cc-45a6-99b3-d8e8ab2b9cae at 09:28:48.231
[info] emitting next record: 6c50b3e1-d3ec-422e-b41a-fe3d92df15a9 at 09:28:48.333
[info] emitting next record: 20b659f9-73e1-4c67-b251-2b224eec4d24 at 09:28:48.421
[info] emitting next record: 9af08f07-8246-498b-9f64-b56982cf3536 at 09:28:48.497
[info] emitting next record: 14cdf3b4-d14f-4953-8609-24c7a1996a12 at 09:28:48.569
[info] emitting next record: 571002f3-7301-4afa-8bc9-3fb8a9e84db2 at 09:28:48.665
[info] emitting next record: 5e88a51b-b56c-40fe-84a3-2fcf18b90e3f at 09:28:48.787
[info] emitting next record: e66b29f3-1690-4645-a048-19049e92303a at 09:28:48.846
[info] emitting next record: 66c16074-b200-4808-a990-13abadc66e43 at 09:28:48.943
[info] emitting next record: 1de8caca-fa48-4777-90a7-1449bd6722bb at 09:28:49.003
[info] emitting next record: bc3859b6-94ab-4262-b4cd-fa757e8f3f1f at 09:28:49.064
[info] emitting next record: 988216a7-5944-4aa5-98f6-b36542d8e7a8 at 09:28:49.172
[info] emitting next record: e6ab4ef6-1fd2-471b-8866-2f8422346df5 at 09:28:49.325
[info] emitting next record: c86b3116-70c8-453e-9ddf-bd8d9e144caf at 09:28:49.384
[info] emitting next record: 78c68185-cdd1-4fde-aa39-e03b37b5f449 at 09:28:49.603
[info] emitting next record: 7ed11952-ceba-47f5-9ba4-25d1e9dceea0 at 09:28:49.671
[info] allocating memory for 6c50b3e1-d3ec-422e-b41a-fe3d92df15a9 at 09:28:50.164
[info] allocating memory for 20b659f9-73e1-4c67-b251-2b224eec4d24 at 09:28:51.459
[info] allocating memory for 9af08f07-8246-498b-9f64-b56982cf3536 at 09:28:52.752
[info] folding records                                            at 09:28:53.106
[info] allocating memory for 14cdf3b4-d14f-4953-8609-24c7a1996a12 at 09:28:53.969
[info] allocating memory for 571002f3-7301-4afa-8bc9-3fb8a9e84db2 at 09:28:55.234
[info] allocating memory for 5e88a51b-b56c-40fe-84a3-2fcf18b90e3f at 09:28:56.422
...
like image 822
Jordan Parmer Avatar asked Mar 16 '17 21:03

Jordan Parmer


People also ask

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

How does Akka stream work?

By default, Akka Streams will fuse the stream operators. This means that the processing steps of a flow or stream can be executed within the same Actor and has two consequences: passing elements from one operator to the next is a lot faster between fused operators due to avoiding the asynchronous messaging overhead.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.


1 Answers

I suspect you are simulating aLongOperation using a Thread.sleep or some other blocking operation. If this is the case, without forcing an async boundary the whole of your graph would share the same actor - and therefore the same thread. Blocking that thread causes the underlying dispatching infrastructure to starve (see docs).

Try to simulate your long operation in a non-blocking fashion (e.g. using the after pattern).

See also the following issue that was raised on the subject.

like image 152
Stefano Bonetti Avatar answered Sep 28 '22 11:09

Stefano Bonetti