When executing async CompletableFuture
, the parent threadcontext and moreover the org.slf4j.MDC
context is lost.
This is bad as I'm using some kind of "fish tagging" to track logs from one request among multiple logfiles.
MDC.put("fishid", randomId())
Question: how can I retain that id during the tasks of CompletableFutures
in general?
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public void businesslogic(Task task) {
LOGGER.info("mdc fishtag context is lost here");
}
SLF4J supports the Mapped Diagnostic Context which is a map where the application code provides key-value pairs which can be inserted by the logging framework in the log messages. If the underlying logging framework supports the MDC then SLF4J facade will pass the maintained key-value pairs to the used logging framework.
To use SLF4J with Java Common Logging you will need to include the slf4j-jdk1.4-SLF4J_VERSION.jar library. Starting with the 1.6 version of the SLF4J library, if no bindings are found on the classpath SLF4J API will start to discard all log messages silently.
When using SLF4J you probably won’t be using the slf4j-simple library and you’ll want to use it with a dedicated logging framework so that your logs can be put in the destination of your choice or even multiple destinations of choice.
MDC has lots of applications, mainly in scenarios in which the execution of several different threads causes interleaved log messages that would be otherwise hard to read. And as we've seen, it's supported by three of the most widely used logging frameworks in Java.
The most readable way I solved this problem was as below -
---------------Thread utils class--------------------
public static Runnable withMdc(Runnable runnable) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return () -> {
MDC.setContextMap(mdc);
runnable.run();
};
}
public static <U> Supplier<U> withMdc(Supplier<U> supplier) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
return (Supplier) () -> {
MDC.setContextMap(mdc);
return supplier.get();
};
}
---------------Usage--------------
CompletableFuture.supplyAsync(withMdc(() -> someSupplier()))
.thenRunAsync(withMdc(() -> someRunnable())
....
WithMdc in ThreadUtils would have to be overloaded to include other functional interfaces which are accepted by CompletableFuture
Please note that the withMdc() method is statically imported to improve readability.
At the end I created a Supplier
wrapper retaining the MDC
. If anyone has a better idea feel free to comment.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return CompletableFuture.supplyAsync(new SupplierMDC(supplier), executor);
}
private static class SupplierMDC<T> implements Supplier<T> {
private final Supplier<T> delegate;
private final Map<String, String> mdc;
public SupplierMDC(Supplier<T> delegate) {
this.delegate = delegate;
this.mdc = MDC.getCopyOfContextMap();
}
@Override
public T get() {
MDC.setContextMap(mdc);
return delegate.get();
}
}
My solution theme would be to (It would work with JDK 9+ as a couple of overridable methods are exposed since that version)
Make the complete ecosystem aware of MDC
And for that, we need to address the following scenarios:
For that, let's create a MDC aware version class of CompletableFuture
by extending it. My version of that would look like below
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {
public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();
@Override
public CompletableFuture newIncompleteFuture() {
return new MDCAwareCompletableFuture();
}
@Override
public Executor defaultExecutor() {
return MDC_AWARE_ASYNC_POOL;
}
public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
return new MDCAwareCompletableFuture<>()
.completeAsync(() -> null)
.thenCombineAsync(future, (aVoid, value) -> value);
}
public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
Function<Throwable, T> throwableFunction) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return getMDCAwareCompletionStage(future)
.handle((value, throwable) -> {
setMDCContext(contextMap);
if (throwable != null) {
return throwableFunction.apply(throwable);
}
return value;
});
}
}
The MDCAwareForkJoinPool
class would look like (have skipped the methods with ForkJoinTask
parameters for simplicity)
public class MDCAwareForkJoinPool extends ForkJoinPool {
//Override constructors which you need
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return super.submit(MDCUtility.wrapWithMdcContext(task));
}
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return super.submit(wrapWithMdcContext(task), result);
}
@Override
public ForkJoinTask<?> submit(Runnable task) {
return super.submit(wrapWithMdcContext(task));
}
@Override
public void execute(Runnable task) {
super.execute(wrapWithMdcContext(task));
}
}
The utility methods to wrap would be such as
public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.call();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static Runnable wrapWithMdcContext(Runnable task) {
//save the current MDC context
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
setMDCContext(contextMap);
try {
return task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
}
};
}
public static void setMDCContext(Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
}
Below are some guidelines for usage:
MDCAwareCompletableFuture
rather than the class CompletableFuture
.CompletableFuture
instantiates the self version such as new CompletableFuture...
. For such methods (most of the public static methods), use an alternative method to get an instance of MDCAwareCompletableFuture
. An example of using an alternative could be rather than using CompletableFuture.supplyAsync(...)
, you can choose new MDCAwareCompletableFuture<>().completeAsync(...)
CompletableFuture
to MDCAwareCompletableFuture
by using the method getMDCAwareCompletionStage
when you get stuck with one because of say some external library which returns you an instance of CompletableFuture
. Obviously, you can't retain the context within that library but this method would still retain the context after your code hits the application code.MDCAwareForkJoinPool
. You could create MDCAwareThreadPoolExecutor
by overriding execute
method as well to serve your use case. You get the idea!With that, your code would look like
List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
tasks.stream()
new MDCAwareCompletableFuture<UpdateHotelAllotmentsRsp>().completeAsync(
() -> businesslogic(task))
.collect(Collectors.toList());
List results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
public UpdateHotelAllotmentsRsp businesslogic(Task task) {
LOGGER.info("mdc fishtag context is not lost here");
}
You can find a detailed explanation of all of the above here in a post about the same.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With