I have read Akka streams materialization concept, and understand stream materialization is:
the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run.
I followed one example to build my akka stream using mapMaterializedValue to send message to queue. The purpose of the code is to push the message to queue after the stream blue print have build and the code is working but I do not really understand what does mapMaterrializaedValue do in the code:
Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail())
.mapMaterializaedValue(queue -> {
promise.trySuccess(queue);
});
source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);
promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
Akka streams consist of three major components in it – Source, Flow and Sink.
Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.
The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
The purpose of mapMaterializedValue
is to transform the materialized value immediately after it is materialized. For example, suppose you have a third-party library which accepts a callback like this:
interface Callback<T> {
void onNext(T next);
void onError(Throwable t);
void onComplete();
}
Then you can create a method which returns a Source<T, Callback<T>>
whose materialized value you can immediately pass to that third-party library when the stream is actually run:
<T> Source<T, Callback<T>> callbackSource() {
return Source.queue(1024, OverflowStrategy.fail())
.mapMaterializedValue(queue -> new Callback<T> {
// an implementation of Callback which pushes the data
// to the queue
});
}
Source<Integer, Callback<Integer>> source = callbackSource();
Callback<Integer> callback = source
.toMat(Sink.foreach(System.out::println), Keep.left())
.run(materializer);
thirdPartyApiObject.runSomethingWithCallback(callback);
You can see here that this can simplify the code that must use such a kind of a third-party API because you do this queue -> callback transformation only once and encapsulate it in a method.
In your case, however, you don't really need it. You're using mapMaterializedValue
to complete an external promise with the materialized value, which is completely unnecessary since you can just use the materialized value after its materialization directly:
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail());
SourceQueueWithComplete<String> queue = source
.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
.run(materIalizer);
mapMapperFunction().apply(queue);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With