Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Context with flatMap() in Reactor?

I have a problem understanding Context. So documentations says that Context is:

A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.

Great.

Now let's suppose we want to propagate something using Context to have it everywhere. To call another async code we simply use the flatMap() method.

Problem: how to access Context inside called method?

Sample (simple) code:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
            .flatMap(TestFlatMap::nameToGreeting)
            .subscriberContext(context ->
                Context.of("greetingWord", "Hello")  // context initialized
            );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.just("Hello " + name + " !!!");  // ALERT: we don't have Context here
    }
}

The called method can be (and most likely will be) in another class.

Thanks for help in advance !

Edit: removed some code to make the question more concise and straight to the point.

like image 799
Hubert Avatar asked Feb 06 '19 22:02

Hubert


People also ask

How does flatMap work reactor?

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

What is Reactor context?

So documentations says that Context is: A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.

How do you extract data from a mono object?

Extract data from Mono in Java – blocking way This way of extracting data is discouraged since we should always use the Reactive Streams in an async and non-blocking way. We can use the block() method that subscribes to a Mono and block indefinitely until the next signal is received. Output: Data from Mono: Hello!

Can flatMap return null?

flatMap : Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono. In all cases, you cannot return null . It is simply forbidden by design.


2 Answers

Chain your Publishers and may the Context be with you

In the case, you connected all your Publishers (and this includes connections within the flatMap/concatMap and similar operators) you will have Context correctly propagated among the whole stream runtime.

To access Context in the nameToGreeting method, you may call Mono.subscribeContext and retrieve stored info event if it seems that methods are not related. The following shows the mentioned concept:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.subscriberContext()
                   .filter(c -> c.hasKey("greetingWord"))
                   .map(c -> c.get("greetingWord"))
                   .flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
    }
}

Also, you can do the same in the following way, using zip operator, to combine results later:

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                                           .flatMap(TestFlatMap::nameToGreeting)
                                           .subscriberContext(context ->
                                                   Context.of("greetingWord", "Hello")  // context initialized
                                           );
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.zip(
            Mono.subscriberContext()
                .filter(c -> c.hasKey("greetingWord"))
                .map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
            Mono.just(name),
            (greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
        );
    }
}

So, why it works?

As we can see from the sample above, the nameToGreeting is called within the context of the main Flux. Under the hood -> (Here Some FluxFlatMap internals), each mapped Publisher is subscribed by FlatMapInner. If we look at the FlatMapInner and look for the currentContext override we will see, that FlatMapInner uses parent Context, which means if the parent has a Reactor Context - then this context will be propagated to each inner Publisher.

Therefore, returned by the nameToGreeting method Mono will have the same Context as its parent

like image 149
Oleh Dokuka Avatar answered Oct 16 '22 22:10

Oleh Dokuka


Reactor-Core v3.4 introduced Mono.deferContextual and Flux.deferContextual, which supersede Mono.deferWithContext and Flux.deferWithContext introduced in v3.3.

Using these methods, Oleh Dokukas zip example can be simplified to

public class TestFlatMap {
    public static void main(final String ...args) {
        final Flux<String> greetings = Flux.just("Hubert", "Sharon")
                .flatMap(TestFlatMap::nameToGreeting)
                .subscriberContext(context ->
                        Context.of("greetingWord", "Hello"));  // context initialized
        greetings.subscribe(System.out::println);
    }

    private static Mono<String> nameToGreeting(final String name) {
        return Mono.deferContextual(c -> Mono.just(name)
                .filter(x -> c.hasKey("greetingWord"))
                .map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
    }
}
like image 21
H. Schulz Avatar answered Oct 16 '22 22:10

H. Schulz