Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use MDC with parallelStream in Java and logback

I need to log a few attributes of the request like the request id and the locale, but when using parallelStream, it seems that the ThreadLocal of the MDC looses the information.

Ive analyzed the solution of passing the MDC context between the threads when creating the parallelStream but it seems dirty and I also have a lot of usages of the parallelStream.

Is there any other way of doing this?

Thank you

like image 215
Guido Celada Avatar asked Apr 06 '26 23:04

Guido Celada


2 Answers

The only solution I found, is to copy the context into a final variable outside of the stream and apply it for every single iteration:

Map<String, String> contextMap = MDC.getCopyOfContextMap();
Stream.iterate(0, i -> i + 1).parallel()
    .peek(i -> MDC.setContextMap(contextMap))
    // ...logic...
    // in case you're using a filter, you need to use a predicate and combine it with a clear step:
    filter(yourPredicate.or(i -> {
                MDC.clear();
                return false;
            }))
    // clear right before terminal operation
    .peek(i -> MDC.clear())
    .findFirst();

// since the initial thread is also used within the stream and the context is cleared there, 
// we need to set it again to its initial state
MDC.setContextMap(contextMap);    

The cost for that solution is 1) a few microseconds per 100 iterations and 2) worse readability, but both are acceptable:

  1. This is a benchmark comparing an IntStream.range(0, 100).parallel().sum() (=baseline) with same stream that uses that MDC copy logic:
Benchmark               Mode  Cnt   Score   Error   Units
MDC_CopyTest.baseline  thrpt    5   0,038 ± 0,005  ops/us
MDC_CopyTest.withMdc   thrpt    5   0,024 ± 0,001  ops/us
MDC_CopyTest.baseline   avgt    5  28,239 ± 1,308   us/op
MDC_CopyTest.withMdc    avgt    5  40,178 ± 0,761   us/op
  1. To improve readability, it can be wrapped into a small helper class:
public class MDCCopyHelper {
    private Map<String, String> contextMap = MDC.getCopyOfContextMap();

    public void set(Object... any) {
        MDC.setContextMap(contextMap);
    }

    public void clear(Object... any) {
        MDC.clear();
    }

    public boolean clearAndFail() {
        MDC.clear();
        return false;
    }
}

The streaming code looks then a bit nicer:

MDCCopyHelper mdcHelper = new MDCCopyHelper();
try {
    Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
        .parallel()
        .peek(mdcHelper::set)
        // ...logic...
        // filters predicates should be combined with clear step
        .filter(yourPredicate.or(mdcHelper::clearAndFail))
        // before terminal call:
        .peek(mdcHelper::clear)
        .findFirst();
} finally {
    // set the correct MDC at the main thread again
    mdcHelper.set();
}
like image 53
rudi Avatar answered Apr 09 '26 13:04

rudi


My solution is to wrap those's functional interface. Similar to the static proxy pattern.
For example

public static void main(String[] args) {
    System.err.println(Thread.currentThread().getName());
    String traceId = "100";
    MDC.put("id", traceId);
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          .forEach((num -> {
              System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
          }));
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          // the key is the TraceableConsumer
          .forEach(new TraceableConsumer<>(num -> {
              System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
          }));
}

public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {

    private final Consumer<T> target;

    public TraceableConsumer(Consumer<T> target) {
        this.target = target;
    }

    @Override
    public void accept(T t) {
        setMDC();
        target.accept(t);
    }
}

public abstract class MDCTraceable {

    private final Long id;

    private final Long userId;

    public MDCTraceable() {
        id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
        userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
    }

    public void setMDC(){
        MDC.put("id", id.toString());
        MDC.put("userId", userId.toString());
    }

    public void cleanMDC(){
        MDC.clear();
    }
}
like image 32
eason huang Avatar answered Apr 09 '26 13:04

eason huang



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!