Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mapped Diagnostic Context logging with play framewok and akka in java

i am trying mdc logging in play filter in java for all requests i followed this tutorial in scala and tried converting to java http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/

but still the mdc is not propagated to all execution contexts. i am using this dispathcher as default dispatcher but there are many execution contexts for it. i need the mdc propagated to all execution contexts

below is my java code

import java.util.Map;

import org.slf4j.MDC;

import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.dispatch.Dispatcher;
import akka.dispatch.ExecutorServiceFactoryProvider;
import akka.dispatch.MessageDispatcherConfigurator;

public class MDCPropagatingDispatcher extends Dispatcher {
    public MDCPropagatingDispatcher(
            MessageDispatcherConfigurator _configurator, String id,
            int throughput, Duration throughputDeadlineTime,
            ExecutorServiceFactoryProvider executorServiceFactoryProvider,
            FiniteDuration shutdownTimeout) {
        super(_configurator, id, throughput, throughputDeadlineTime,
                executorServiceFactoryProvider, shutdownTimeout);

    }

    @Override
    public ExecutionContext prepare() {
        final Map<String, String> mdcContext = MDC.getCopyOfContextMap();
        return new ExecutionContext() {

            @Override
            public void execute(Runnable r) {
                Map<String, String> oldMDCContext = MDC.getCopyOfContextMap();
                setContextMap(mdcContext);
                try {
                    r.run();
                } finally {
                    setContextMap(oldMDCContext);
                }
            }

            @Override
            public ExecutionContext prepare() {
                return this;
            }

            @Override
            public void reportFailure(Throwable t) {
                play.Logger.info("error occured in dispatcher");
            }

        };
    }

    private void setContextMap(Map<String, String> context) {
        if (context == null) {
            MDC.clear();
        } else {
            play.Logger.info("set context "+ context.toString());
            MDC.setContextMap(context);
        }
    }
}



import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import com.typesafe.config.Config;

import akka.dispatch.DispatcherPrerequisites;
import akka.dispatch.MessageDispatcher;
import akka.dispatch.MessageDispatcherConfigurator;

public class MDCPropagatingDispatcherConfigurator extends
        MessageDispatcherConfigurator {
    private MessageDispatcher instance;

    public MDCPropagatingDispatcherConfigurator(Config config,
            DispatcherPrerequisites prerequisites) {
        super(config, prerequisites);
        Duration throughputDeadlineTime = new FiniteDuration(-1,
                TimeUnit.MILLISECONDS);
        FiniteDuration shutDownDuration = new FiniteDuration(1,
                TimeUnit.MILLISECONDS);
        instance = new MDCPropagatingDispatcher(this, "play.akka.actor.contexts.play-filter-context",
                100, throughputDeadlineTime,
                configureExecutor(), shutDownDuration);
    }

    public MessageDispatcher dispatcher() {
        return instance;
    }

}

filter interceptor

public class MdcLogFilter implements EssentialFilter {
@Override
public EssentialAction apply(final EssentialAction next) {
    return new MdcLogAction() {
        @Override
        public Iteratee<byte[], SimpleResult> apply(
                final RequestHeader requestHeader) {
            final String  uuid = Utils.generateRandomUUID();
            MDC.put("uuid", uuid);
            play.Logger.info("request started"+uuid);
            final ExecutionContext playFilterContext = Akka.system()
                    .dispatchers()
                    .lookup("play.akka.actor.contexts.play-custom-filter-context");
            return next.apply(requestHeader).map(
                    new AbstractFunction1<SimpleResult, SimpleResult>() {
                        @Override
                        public SimpleResult apply(SimpleResult simpleResult) {
                            play.Logger.info("request ended"+uuid);
                            MDC.remove("uuid");
                            return simpleResult;
                        }
                    }, playFilterContext);

        }

        @Override
        public EssentialAction apply() {
            return next.apply();
        }
    };
}

}

like image 368
surendar Avatar asked Jul 09 '15 16:07

surendar


1 Answers

Below is my solution, proven in real life. It's in Scala, and not for Play, but for Scalatra, but the underlying concept is the same. Hope you'll be able to figure out how to port this to Java.

import org.slf4j.MDC
import java.util.{Map => JMap}
import scala.concurrent.{ExecutionContextExecutor, ExecutionContext}

object MDCHttpExecutionContext {

  def fromExecutionContextWithCurrentMDC(delegate: ExecutionContext): ExecutionContextExecutor =
    new MDCHttpExecutionContext(MDC.getCopyOfContextMap(), delegate)
}

class MDCHttpExecutionContext(mdcContext: JMap[String, String], delegate: ExecutionContext)
  extends ExecutionContextExecutor {

  def execute(runnable: Runnable): Unit = {
    val callingThreadMDC = MDC.getCopyOfContextMap()
    delegate.execute(new Runnable {
      def run() {
        val currentThreadMDC = MDC.getCopyOfContextMap()
        setContextMap(callingThreadMDC)
        try {
          runnable.run()
        } finally {
          setContextMap(currentThreadMDC)
        }
      }
    })
  }

  private[this] def setContextMap(context: JMap[String, String]): Unit = {
    Option(context) match {
      case Some(ctx) => {
        MDC.setContextMap(context)
      }
      case None => {
        MDC.clear()
      }
    }
  }

  def reportFailure(t: Throwable): Unit = delegate.reportFailure(t)
}

You'll have to make sure that this ExecutionContext is used in all of your asynchronous calls. I achieve this through Dependency Injection, but there are different ways. That's how I do it with subcut:

bind[ExecutionContext] idBy BindingIds.GlobalExecutionContext toSingle {
    MDCHttpExecutionContext.fromExecutionContextWithCurrentMDC(
      ExecutionContext.fromExecutorService(
        Executors.newFixedThreadPool(globalThreadPoolSize)
      )
    )
  }

The idea behind this approach is as follows. MDC uses thread-local storage for the attributes and their values. If a single request of yours can run on a multiple threads, then you need to make sure the new thread you start uses the right MDC. For that, you create a custom executor that ensures the proper copying of the MDC values into the new thread before it starts executing the task you assign to it. You also must ensure that when the thread finishes your task and continues with something else, you put the old values into its MDC, because threads from a pool can switch between different requests.

like image 111
Haspemulator Avatar answered Nov 20 '22 22:11

Haspemulator