I am new to Akka Stream
and have only started reading its docs as of last week. I am able to understand most of the concepts but I am finding it hard to understand what Materialized Value
means in Akka stream and what its significance is?.
If somebody could explain this to me with a real world example that would help a lot in co-relating it and its use cases in Akka Stream.
Update
I was going through below example and wanted to understand the two different kind of output that we get for the same graph. I know it has direct relation with the Materialized Value
and that's why I asked the above question.
implicit val system = ActorSystem("PlainSinkProducerMain")
implicit val materializer = ActorMaterializer()
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableGraph[Future[Int]] =
source.toMat(sink)(Keep.right)
val sum1: Future[Int] = runnable.run()
val sum2: Future[Int] = runnable.run()
println(sum1.value)
println(sum2.value)
When I run this I get below output :
None
Some(Success(55))
The above example is from the docs and below is the explanation of it.
Since a stream can be materialized multiple times, the materialized value will also be calculated anew for each such materialization, usually leading to different values being returned each time. In the example below we create two running materialized instance of the stream that we described in the runnable variable, and both materializations give us a different Future from the map even though we used the same sink to refer to the future.
I have 2 questions here: 1) Why the 2 different Materialized Instance of same graph have 2 different output values? Although the materialized value is calculated twice but under the hood both the calculation are for same graph.
2) Why the return type is Some(Success(55))
instead of a plain 55
for successful execution and Failure Exception
for failed cases? (This might be dumb questions but I want to understand what problem/advantage does this special return value solves/gives.)
The Jedi value returned by materializing a graph is called a materialized value. The value may or may not be connected to the actual elements that flow through the stream, and the value can be of any type - which again, may or may not be different from the types of elements flowing through the graph.
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.
Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.
The starting point is called Source and can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.Publisher. A flow with an attached input and open output is also a Source. A flow may also be defined without an attached input or output and that is then a Flow.
(answer to the comment) I'm not sure I understand your question entirely, but in general streams are asynchronous, and when you execute run()
to materialize a stream, it starts executing in a different thread pool. Therefore, even if the stream terminates immediately, it is not guaranteed that the value will be available immediately. That's why the result of the Sink.fold
is Future[T]
- futures means values which are going to be available "in future", and this is exactly the semantics which is needed here.
"why ... have 2 different values" - precisely because of the asynchronicity. In your case it happened that the second future has been completed before you observed it, but the first one has not. If you run this program multiple times, it is likely you will get different results.
"why the return type is Some(Success(55))
..." well, that's how the futures in Scala are designed. The Future.value
method returns Option[Try[T]]
, that is, it returns None
if the future is not completed at the moment of the invocation and Some(v)
if it is completed, where v
is either Success(result)
or Failure(throwable)
. In general, in Scala it is idiomatic to treat errors as values instead of exceptions which implicitly unroll the call stack when thrown.
Note that you usually do not use Future.value
when working with futures. Usually you transform futures with combinators like map
, flatMap
or Future.sequence
, and then either pass the result of the final transformed future to some library (e.g. as an HTTP response in a web framework) or use a method like Await.result(future, timeout)
to get the final result of the future or an exception, if the future is not completed during the specified time frame.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With