Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use MDC with thread pools?

People also ask

Is MDC per thread?

A Mapped Diagnostic Context, or MDC in short, is an instrument for distinguishing interleaved log output from different sources. Log output is typically interleaved when a server handles multiple clients near-simultaneously. The MDC is managed on a per thread basis.

How does Log4J MDC work?

MDC in Log4j MDC in Log4j allows us to fill a map-like structure with pieces of information that are accessible to the appender when the log message is actually written. The MDC structure is internally attached to the executing thread in the same way a ThreadLocal variable would be.

How do you use thread pool?

To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.

What is MDC in SLF4J?

MDC stands for Mapped Diagnostic Context, and is a feature that is offered by both plain Log4J and SLF4J with Logback or Log4J as the backing logging framework. It is a map that holds String values keyed by Strings, which is associated with the current thread (using a ThreadLocal).


Yes, this is a common problem I've run into as well. There are a few workarounds (like manually setting it, as described), but ideally you want a solution that

  • Sets the MDC consistently;
  • Avoids tacit bugs where the MDC is incorrect but you don't know it; and
  • Minimizes changes to how you use thread pools (e.g. subclassing Callable with MyCallable everywhere, or similar ugliness).

Here's a solution that I use that meets these three needs. Code should be self-explanatory.

(As a side note, this executor can be created and fed to Guava's MoreExecutors.listeningDecorator(), if you use Guava's ListanableFuture.)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}

We have run into a similar problem. You might want to extend ThreadPoolExecutor and override before/afterExecute methods to make the MDC calls you need before starting/stopping new threads.


IMHO the best solution is to:

  • use ThreadPoolTaskExecutor
  • implement your own TaskDecorator
  • use it: executor.setTaskDecorator(new LoggingTaskDecorator());

The decorator can look like this:

private final class LoggingTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable task) {
        // web thread
        Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
        return () -> {
            // work thread
            try {
                // TODO: is this thread safe?
                MDC.setContextMap(webThreadContext);
                task.run();
            } finally {
                MDC.clear();
            }
        };
    }

}

This is how I do it with fixed thread pools and executors:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

In the threading part:

executor.submit(() -> {
    MDC.setContextMap(mdcContextMap);
    // my stuff
});

In case you face this problem in a spring framework related environment where you run tasks by using @Async annotation you are able to decorate the tasks by using the TaskDecorator approach.

A sample of how to do it is provided here:

I faced this issue and the article above helped me to tackle it so that's why I am sharing it here.


Similar to the previously posted solutions, the newTaskFor methods for Runnable and Callable can be overwritten in order to wrap the argument (see accepted solution) when creating the RunnableFuture.

Note: Consequently, the executorService's submit method must be called instead of the execute method.

For the ScheduledThreadPoolExecutor, the decorateTask methods would be overwritten instead.


Another variation similar to existing answers here is to implement ExecutorService and allow a delegate to be passed to it. Then using generics, it can still expose the actual delegate in case one wants to get some stats (as long no other modification methods are used).

Reference code:

  • https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCThreadPoolExecutor.java
  • https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common/concurrent/MDCWrappers.java
public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {

    private final D delegate;

    public MDCExecutorService(D delegate) {
        this.delegate = delegate;
    }

    @Override
    public void shutdown() {
        delegate.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return delegate.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return delegate.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return delegate.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.awaitTermination(timeout, unit);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return delegate.submit(wrap(task), result);
    }

    @Override
    public Future<?> submit(Runnable task) {
        return delegate.submit(wrap(task));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks));
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return delegate.invokeAny(wrapCollection(tasks));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
    }

    @Override
    public void execute(Runnable command) {
        delegate.execute(wrap(command));
    }

    public D getDelegate() {
        return delegate;
    }

    /* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
    /concurrent/MDCWrappers.java */

    private static Runnable wrap(final Runnable runnable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Callable<T> wrap(final Callable<T> callable) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return () -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
        final Map<String, String> context = MDC.getCopyOfContextMap();
        return (t) -> {
            Map previous = MDC.getCopyOfContextMap();
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                consumer.accept(t);
            } finally {
                if (previous == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(previous);
                }
            }
        };
    }

    private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
        Collection<Callable<T>> wrapped = new ArrayList<>();
        for (Callable<T> task : tasks) {
            wrapped.add(wrap(task));
        }
        return wrapped;
    }
}