Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handle Akka stream's first element specially

Is there an idiomatic way of handling Akka stream's Source first element in a special way? What I have now is:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

Thanks

like image 395
povder Avatar asked Nov 22 '16 13:11

povder


People also ask

What is Akka Streams API?

The Akka Streams API allows us to easily compose data transformation flows from independent steps. Moreover, all processing is done in a reactive, non-blocking, and asynchronous way. 2. Maven Dependencies

What are the basic principles of Akka streams?

From this follows that the principles implemented by Akka Streams are: supreme compositionality: combined pieces retain the function of each part exhaustive model of the domain of distributed bounded stream processing

Can onerror be used in Akka streams?

There is only limited support for treating onError in Akka Streams compared to the operators that are available for the transformation of data elements, which is intentional in the spirit of the previous paragraph.

What is a flow in Akka?

Since the language for writing these streams always flows from left to right (just like plain English), we need a starting point that is like a source but with an “open” input. In Akka Streams this is called a Flow:


Video Answer


1 Answers

While I would generally go with Ramon's answer, you could also use prefixAndTail, with a prefix of 1, together with flatMapConcat to achieve something similar:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5

This of course works not just for the first item, but for the first N items, with the proviso that those items will be taken up as a strict collection.

like image 189
Mikesname Avatar answered Sep 16 '22 18:09

Mikesname