I was asking myself if there was a way to push the reactive context into a ThreadLocal variable before a subscriber received the onNext signal. While digging inside reactor-core, I've found Hooks class and Lift BiFunction.
I've created a class with the following implementation. The class is composed of a ThreadLocal variable that will hold the Context and implements the necessary BiFunction interface. It will delegate all the call to the actual subscriber and will also push the context if modified into the ThreadLocal variable before calling the onNext on the actual subscriber.
package com.example.demo;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;
import java.util.function.BiFunction;
public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {
private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);
private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();
public static Context getContext() {
Context context = contextHolder.get();
if (context == null) {
context = Context.empty();
contextHolder.set(context);
}
return context;
}
public static void setContext(Context context) {
contextHolder.set(context);
}
@Override
public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
}
final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {
private CoreSubscriber<? super U> delegate;
public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
this.delegate = delegate;
}
@Override
public Context currentContext() {
return delegate.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
}
@Override
public void onNext(U u) {
Context context = delegate.currentContext();
if (!context.isEmpty()) {
Context currentContext = ThreadLocalContextLifter.getContext();
if (!currentContext.equals(context)) {
logger.info("Pushing reactive context to holder {}", context);
ThreadLocalContextLifter.setContext(context);
}
}
delegate.onNext(u);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}
}
The instance is loaded into the Hooks with the following code:
Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));
I've run some tests and it seems to work properly but I'm not convinced by the solution. I'm guessing that the hook will degrade the performance of reactor or that it will not work in some case that I'm not aware of.
My question is simple: Is this a bad idea?
I don't think there is anything wrong with that idea... The hook is used by every Reactor-provided operator.
The Context
doesn't change between onNext
, so the lift ThreadLocalContextCoreSubscriber
could capture it in onSubscribe
. But you'd still need to check the ThreadLocal at least once in onNext
, since onNext
and onSubscribe
can happen on two different threads, so your solution of using delegate.currentContext()
works too. In the end, your approach looks sound.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With