Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka-streams - how to access the materialized value of the stream

I am learning to work with Akka streams, and really loving it, but the materialization part is still somewhat a mystery to me.

Quoting from http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api

... trigger the immediate shutdown of a specific pool by calling shutdown() on the HostConnectionPool instance that the pool client flow materializes into

How do I get hold of the HostConnectionPool instance?

Even more importantly, I'd like to understand in general how to access the materialized value and perform some operation or retrieve information from it.

Thanks in advance for any documentation pointers or explanation.

like image 728
Yardena Avatar asked Jan 21 '16 12:01

Yardena


People also ask

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

What is materialize in Akka?

A materializer makes actors execute a graph to produce those results. A graph, in its simplest form, consists of a source that provides elements, and a sink that consumes elements.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


1 Answers

This is accomplished with the Source.viaMat function. Extending the code example from the link provided in your question:

import akka.http.scaladsl.Http.HostConnectionPool
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RunnableGraph

val poolClientFlow = Http().cachedHostConnectionPool[Int]("akka.io")

val graph: RunnableGraph[(HostConnectionPool, Future[(Try[HttpResponse], Int)])]  =
  Source.single(HttpRequest(uri = "/") -> 42)
        .viaMat(poolClientFlow)(Keep.right)
        .toMat(Sink.head)(Keep.both)

val (pool, fut) = graph.run()

pool.shutdown()

Since Source.single materializes into Unit the Keep.right says to keep the HostConnectionPool which the poolClientFlow materializes into. In the .toMat function the Keep.both says to keep both the left pool from the flow and the right Future from the Sink.

like image 91
Ramón J Romero y Vigil Avatar answered Oct 14 '22 12:10

Ramón J Romero y Vigil