Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count number of elements in Akka Streams

I'm trying to transform a Source of Scala entities into a Source of ByteString via Alpakka's CsvFormatting and count number of elements in the initial stream. Could you suggest the best way to count the initialSource elements and keep the result as a ByteString Source:

val initialSource: Source[SomeEntity, NotUsed] = Source.fromPublisher(publisher)
val csvSource: Source[ByteString, NotUsed] = initialSource
  .map(e => List(e.firstName, e.lastName, e.city))
  .via(CsvFormatting.format())
like image 854
noname.404 Avatar asked Mar 11 '18 21:03

noname.404


People also ask

Can you describe what are 3 main components of Akka streams?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.

What is actor Materializer?

Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.

What are Akka streams?

Akka Streams is a module built on top of Akka Actors to make the ingestion and processing of streams easy. It provides easy-to-use APIs to create streams that leverage the power of the Akka toolkit without explicitly defining actor behaviors and messages.

What is source in Akka?

The starting point is called Source and can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.Publisher. A flow with an attached input and open output is also a Source. A flow may also be defined without an attached input or output and that is then a Flow.


1 Answers

To count the elements in a stream, one must run the stream. One approach is to broadcast the stream elements to two sinks: one sink is the result of the main processing, the other sink simply counts the number of elements. Here is a simple example, which uses a graph to obtain the materialized values of both sinks:

val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)

val g = RunnableGraph.fromGraph(GraphDSL.create(sink1, sink2)((_, _)) { implicit builder =>
  (s1, s2) =>
  import GraphDSL.Implicits._

  val broadcast = builder.add(Broadcast[ByteString](2))

  val source: Source[ByteString, NotUsed] =
    Source(1 to 10)
      .map(i => List(i.toString))
      .via(CsvFormatting.format())

  source ~> broadcast.in
  broadcast.out(0) ~> s1
  broadcast.out(1) ~> s2
  ClosedShape
}) // RunnableGraph[(Future[Done], Future[Int])]

val (fut1, fut2) = g.run()

fut2 onComplete {
  case Success(count) => println(s"Number of elements: $count")
  case Failure(_) =>
}

In the above example, the first sink just prints the stream elements and has a materialized value of type Future[Done]. The second sink does a fold operation to count the stream elements and has a materialized value of type Future[Int]. The following is printed:

ByteString(49, 13, 10)
ByteString(50, 13, 10)
ByteString(51, 13, 10)
ByteString(52, 13, 10)
ByteString(53, 13, 10)
ByteString(54, 13, 10)
ByteString(55, 13, 10)
ByteString(56, 13, 10)
ByteString(57, 13, 10)
ByteString(49, 48, 13, 10)
Number of elements: 10

Another option for sending stream elements to two different sinks, while retaining their respective materialized values, is to use alsoToMat:

val sink1 = Sink.foreach(println)
val sink2 = Sink.fold[Int, ByteString](0)((acc, _) => acc + 1)

val (fut1, fut2) = Source(1 to 10)
  .map(i => List(i.toString))
  .via(CsvFormatting.format())
  .alsoToMat(sink1)(Keep.right)
  .toMat(sink2)(Keep.both)
  .run() // (Future[Done], Future[Int])

fut2 onComplete {
  case Success(count) => println(s"Number of elements: $count")
  case Failure(_) =>
}

This produces the same result as the graph example described earlier.

like image 129
Jeffrey Chung Avatar answered Sep 23 '22 13:09

Jeffrey Chung