Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does CompletableFuture have a corresponding Local context?

In the olden days, we had ThreadLocal for programs to carry data along with the request path since all request processing was done on that thread and stuff like Logback used this with MDC.put("requestId", getNewRequestId());

Then Scala and functional programming came along and Futures came along and with them came Local.scala (at least I know the twitter Futures have this class). Future.scala knows about Local.scala and transfers the context through all the map/flatMap, etc. etc. functionality such that I can still do Local.set("requestId", getNewRequestId()); and then downstream after it has travelled over many threads, I can still access it with Local.get(...)

Soooo, my question is in Java, can I do the same thing with the new CompletableFuture somewhere with LocalContext or some object (not sure of the name) and in this way, I can modify Logback MDC context to store it in that context instead of a ThreadLocal such that I don't lose the request id and all my logs across the thenApply, thenAccept, etc. etc. still work just fine with logging and the -XrequestId flag in Logback configuration.

EDIT:

As an example. If you have a request come in and you are using Log4j or Logback, in a filter, you will set MDC.put("requestId", requestId) and then in your app, you will log many log statements line this:

log.info("request came in for url="+url);
log.info("request is complete");

Now, in the log output it will show this:

INFO {time}: requestId425 request came in for url=/mypath
INFO {time}: requestId425 request is complete

This is using a trick of ThreadLocal to achieve this. At Twitter, we use Scala and Twitter Futures in Scala along with a Local.scala class. Local.scala and Future.scala are tied together in that we can achieve the above scenario still which is very nice and all our log statements can log the request id so the developer never has to remember to log the request id and you can trace through a single customers request response cycle with that id.

I don't see this in Java :( which is very unfortunate as there are many use cases for that. Perhaps there is something I am not seeing though?

like image 584
Dean Hiller Avatar asked Jun 21 '16 00:06

Dean Hiller


People also ask

How does CompletableFuture work?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.

Is CompletableFuture reactive programming?

Ans. A CompletableFuture represents one result of an asynchronous call, while Reactive Streams is a pattern for pushing N messages synchronously/asynchronously through a system. CompletableFuture doesn't address the elasticity requirement of the Reactive Manifesto as it does not handle backpressure.

What is the difference between Future and CompletableFuture?

Future vs CompletableFuture. CompletableFuture is an extension to Java's Future API which was introduced in Java 5. A Future is used as a reference to the result of an asynchronous computation.

How is CompletableFuture implemented?

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results. The CompletableFuture. join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally.


2 Answers

My solution theme would be to (It would work with JDK 9+ as a couple of overridable methods are exposed since that version)

Make the complete ecosystem aware of MDC

And for that, we need to address the following scenarios:

  • When all do we get new instances of CompletableFuture from within this class? → We need to return a MDC aware version of the same rather.
  • When all do we get new instances of CompletableFuture from outside this class? → We need to return a MDC aware version of the same rather.
  • Which executor is used when in CompletableFuture class? → In all circumstances, we need to make sure that all executors are MDC aware

For that, let's create a MDC aware version class of CompletableFuture by extending it. My version of that would look like below

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

The MDCAwareForkJoinPool class would look like (have skipped the methods with ForkJoinTask parameters for simplicity)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

The utility methods to wrap would be such as

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

Below are some guidelines for usage:

  • Use the class MDCAwareCompletableFuture rather than the class CompletableFuture.
  • A couple of methods in the class CompletableFuture instantiates the self version such as new CompletableFuture.... For such methods (most of the public static methods), use an alternative method to get an instance of MDCAwareCompletableFuture. An example of using an alternative could be rather than using CompletableFuture.supplyAsync(...), you can choose new MDCAwareCompletableFuture<>().completeAsync(...)
  • Convert the instance of CompletableFuture to MDCAwareCompletableFuture by using the method getMDCAwareCompletionStage when you get stuck with one because of say some external library which returns you an instance of CompletableFuture. Obviously, you can't retain the context within that library but this method would still retain the context after your code hits the application code.
  • While supplying an executor as a parameter, make sure that it is MDC Aware such as MDCAwareForkJoinPool. You could create MDCAwareThreadPoolExecutor by overriding execute method as well to serve your use case. You get the idea!

You can find a detailed explanation of all of the above here in a post about the same.

like image 68
Laks Avatar answered Nov 13 '22 13:11

Laks


If you come across this, just poke the thread here http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html

to implement something like twitter Futures which transfer Locals (Much like ThreadLocal but transfers state).

See the def respond() method in here and how it calls Locals.save() and Locals.restort() https://github.com/simonratner/twitter-util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

If Java Authors would fix this, then the MDC in logback would work across all 3rd party libraries. Until then, IT WILL NOT WORK unless you can change the 3rd party library(doubtful you can do that).

like image 38
Dean Hiller Avatar answered Nov 13 '22 13:11

Dean Hiller