I need to do something really similar to this https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
my problem is that I have an unknown number of groups and if the number of parallelism of the mapAsync is less of the number of groups i got and error in the last sink
Tearing down SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) due to upstream error (akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)
I tried to put a buffer in the middle as suggested in the pattern guide of akka streams http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
but with the same result
Expanding on jrudolph's comment which is completely correct...
You do not need a mapAsync
in this instance. As a basic example, suppose you have a source of tuples
import akka.stream.scaladsl.{Source, Sink}
def data() = List(("foo", 1),
("foo", 2),
("bar", 1),
("foo", 3),
("bar", 2))
val originalSource = Source(data)
You can then perform a groupBy to create a Source of Sources
def getID(tuple : (String, Int)) = tuple._1
//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID
Each one of the grouped Sources can be processed in parallel with just a map
, no need for anything fancy. Here is an example of each grouping being summed in an independent stream:
import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher
def getValues(tuple : (String, Int)) = tuple._2
//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)
//a Source of (String, Future[Int])
val sumSource =
groupedSource map { case (id, src) =>
id -> {src map getValues runWith sumSink} //calculate sum in independent stream
}
Now all of the "foo"
numbers are being summed in parallel with all of the "bar"
numbers.
mapAsync
is used when you have a encapsulated function that returns a Future[T]
and you're trying to emit a T
instead; which is not the case in you question. Further, mapAsync involves waiting for results which is not reactive...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With