Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Stream: what does mapMaterializedValue mean

I have read Akka streams materialization concept, and understand stream materialization is:

the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run.

I followed one example to build my akka stream using mapMaterializedValue to send message to queue. The purpose of the code is to push the message to queue after the stream blue print have build and the code is working but I do not really understand what does mapMaterrializaedValue do in the code:

Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail())
    .mapMaterializaedValue(queue -> {
        promise.trySuccess(queue);
    });

source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);

promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
like image 379
zt1983811 Avatar asked Jun 14 '17 12:06

zt1983811


People also ask

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

What is backpressure in Akka?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.


1 Answers

The purpose of mapMaterializedValue is to transform the materialized value immediately after it is materialized. For example, suppose you have a third-party library which accepts a callback like this:

interface Callback<T> {
    void onNext(T next);
    void onError(Throwable t);
    void onComplete();
}

Then you can create a method which returns a Source<T, Callback<T>> whose materialized value you can immediately pass to that third-party library when the stream is actually run:

<T> Source<T, Callback<T>> callbackSource() {
    return Source.queue(1024, OverflowStrategy.fail())
        .mapMaterializedValue(queue -> new Callback<T> {
            // an implementation of Callback which pushes the data
            // to the queue
        });
}

Source<Integer, Callback<Integer>> source = callbackSource();

Callback<Integer> callback = source
    .toMat(Sink.foreach(System.out::println), Keep.left())
    .run(materializer);

thirdPartyApiObject.runSomethingWithCallback(callback);

You can see here that this can simplify the code that must use such a kind of a third-party API because you do this queue -> callback transformation only once and encapsulate it in a method.

In your case, however, you don't really need it. You're using mapMaterializedValue to complete an external promise with the materialized value, which is completely unnecessary since you can just use the materialized value after its materialization directly:

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail());

SourceQueueWithComplete<String> queue = source
    .toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
    .run(materIalizer);

mapMapperFunction().apply(queue);
like image 137
Vladimir Matveev Avatar answered Sep 20 '22 05:09

Vladimir Matveev