A Mapped Diagnostic Context, or MDC in short, is an instrument for distinguishing interleaved log output from different sources. Log output is typically interleaved when a server handles multiple clients near-simultaneously. The MDC is managed on a per thread basis.
MDC in Log4j MDC in Log4j allows us to fill a map-like structure with pieces of information that are accessible to the appender when the log message is actually written. The MDC structure is internally attached to the executing thread in the same way a ThreadLocal variable would be.
To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.
MDC stands for Mapped Diagnostic Context, and is a feature that is offered by both plain Log4J and SLF4J with Logback or Log4J as the backing logging framework. It is a map that holds String values keyed by Strings, which is associated with the current thread (using a ThreadLocal).
Yes, this is a common problem I've run into as well. There are a few workarounds (like manually setting it, as described), but ideally you want a solution that
Callable
with MyCallable
everywhere, or similar ugliness).Here's a solution that I use that meets these three needs. Code should be self-explanatory.
(As a side note, this executor can be created and fed to Guava's MoreExecutors.listeningDecorator()
, if
you use Guava's ListanableFuture
.)
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.*;
/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by jlevy.
* Date: 6/14/13
*/
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {
final private boolean useFixedContext;
final private Map<String, Object> fixedContext;
/**
* Pool where task threads take MDC from the submitting thread.
*/
public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* Pool where task threads take fixed MDC from the thread that creates the pool.
*/
@SuppressWarnings("unchecked")
public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
}
/**
* Pool where task threads always have a specified, fixed MDC.
*/
public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.fixedContext = fixedContext;
useFixedContext = (fixedContext != null);
}
@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}
We have run into a similar problem. You might want to extend ThreadPoolExecutor and override before/afterExecute methods to make the MDC calls you need before starting/stopping new threads.
IMHO the best solution is to:
ThreadPoolTaskExecutor
TaskDecorator
executor.setTaskDecorator(new LoggingTaskDecorator());
The decorator can look like this:
private final class LoggingTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable task) {
// web thread
Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
return () -> {
// work thread
try {
// TODO: is this thread safe?
MDC.setContextMap(webThreadContext);
task.run();
} finally {
MDC.clear();
}
};
}
}
This is how I do it with fixed thread pools and executors:
ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();
In the threading part:
executor.submit(() -> {
MDC.setContextMap(mdcContextMap);
// my stuff
});
In case you face this problem in a spring framework related environment where you run tasks by using @Async
annotation you are able to decorate the tasks by using the TaskDecorator approach.
A sample of how to do it is provided here:
I faced this issue and the article above helped me to tackle it so that's why I am sharing it here.
Similar to the previously posted solutions, the newTaskFor
methods for Runnable
and Callable
can be overwritten in order to wrap the argument (see accepted solution) when creating the RunnableFuture
.
Note: Consequently, the executorService
's submit
method must be called instead of the execute
method.
For the ScheduledThreadPoolExecutor
, the decorateTask
methods would be overwritten instead.
Another variation similar to existing answers here is to implement ExecutorService
and allow a delegate to be passed to it. Then using generics, it can still expose the actual delegate in case one wants to get some stats (as long no other modification methods are used).
Reference code:
public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {
private final D delegate;
public MDCExecutorService(D delegate) {
this.delegate = delegate;
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapCollection(tasks));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
}
@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}
public D getDelegate() {
return delegate;
}
/* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
/concurrent/MDCWrappers.java */
private static Runnable wrap(final Runnable runnable) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
private static <T> Callable<T> wrap(final Callable<T> callable) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return (t) -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
consumer.accept(t);
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
Collection<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(wrap(task));
}
return wrapped;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With