I have a problem understanding Context. So documentations says that Context is:
A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.
Great.
Now let's suppose we want to propagate something using Context to have it everywhere. To call another async code we simply use the flatMap() method.
Problem: how to access Context inside called method?
Sample (simple) code:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.just("Hello " + name + " !!!"); // ALERT: we don't have Context here
}
}
The called method can be (and most likely will be) in another class.
Thanks for help in advance !
Edit: removed some code to make the question more concise and straight to the point.
The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
So documentations says that Context is: A key/value store that is propagated between components such as operators via the context protocol. Contexts are ideal to transport orthogonal information such as tracing or security tokens.
Extract data from Mono in Java – blocking way This way of extracting data is discouraged since we should always use the Reactive Streams in an async and non-blocking way. We can use the block() method that subscribes to a Mono and block indefinitely until the next signal is received. Output: Data from Mono: Hello!
flatMap : Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono. In all cases, you cannot return null . It is simply forbidden by design.
Publisher
s and may the Context
be with youIn the case, you connected all your Publisher
s (and this includes connections within the flatMap
/concatMap
and similar operators) you will have Context
correctly propagated among the whole stream runtime.
To access Context
in the nameToGreeting
method, you may call Mono.subscribeContext
and retrieve stored info event if it seems that methods are not related.
The following shows the mentioned concept:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord"))
.flatMap(greetingWord -> Mono.just(greetingWord + " " + name + " " + "!!!"));// ALERT: we have Context here !!!
}
}
Also, you can do the same in the following way, using zip
operator, to combine results later:
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello") // context initialized
);
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.zip(
Mono.subscriberContext()
.filter(c -> c.hasKey("greetingWord"))
.map(c -> c.get("greetingWord")), // ALERT: we have Context here !!!
Mono.just(name),
(greetingWord, receivedName) -> greetingWord + " " + receivedName + " " + "!!!"
);
}
}
As we can see from the sample above, the nameToGreeting
is called within the context of the main Flux
. Under the hood -> (Here Some FluxFlatMap internals), each mapped Publisher
is subscribed by FlatMapInner
. If we look at the FlatMapInner
and look for the currentContext
override we will see, that FlatMapInner
uses parent Context
, which means if the parent has a Reactor Context
- then this context will be propagated to each inner Publisher
.
Therefore, returned by the nameToGreeting
method Mono
will have the same Context
as its parent
Reactor-Core v3.4 introduced Mono.deferContextual and Flux.deferContextual, which supersede Mono.deferWithContext and Flux.deferWithContext introduced in v3.3.
Using these methods, Oleh Dokukas zip example can be simplified to
public class TestFlatMap {
public static void main(final String ...args) {
final Flux<String> greetings = Flux.just("Hubert", "Sharon")
.flatMap(TestFlatMap::nameToGreeting)
.subscriberContext(context ->
Context.of("greetingWord", "Hello")); // context initialized
greetings.subscribe(System.out::println);
}
private static Mono<String> nameToGreeting(final String name) {
return Mono.deferContextual(c -> Mono.just(name)
.filter(x -> c.hasKey("greetingWord"))
.map(n -> c.get("greetingWord") + " " + n + " " + "!!!"));
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With