In my project I frequently work with concurrent tasks using Java Futures. In one application, each concurrent task is requiring quite big chunk of memory during its completion. Due to some other design choices, that memory is created and referenced in an object created outside the thread (see more detailed example below).
To my surprise, the future holds a reference to this object, even after the future task (i.e., its calculation thread) has been completed. That is: if no other reference to this object is held elsewhere the object will not be freed, unless the future is freed - even though the task has completed.
My naive thinking was that limiting the number of concurrent threads will automatically limit the number of resources (memory) held by the tasks. THIS IS NOT TRUE!
Consider the code below. In this example I create some task. During their calculation an ArrayList (which is an outer variable) grows in size. The method returns a Vector<Future>
. Even if the task has completed and even if the scope of the ArrayList has been left, the Future still holds a reference to the ArrayList (via FutureTask.sync.callable
).
To summarize:
Question: What is the best way to free resources held via the Future? (Of course, I know that local variables of the callable are released upon thread completion - this is not what I am asking for).
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
*
* Created on 17.08.2013
*/
package net.finmath.experiments.concurrency;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Christian Fries
*
*/
public class ConcurrencyTest {
private ExecutorService executor = Executors.newFixedThreadPool(10);
private int numberOfDoubles = 1024*1024/8; // 1 MB
private int numberOfModels = 100; // 100 * 1 MB
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
ConcurrencyTest ct = new ConcurrencyTest();
ct.concurrencyTest();
}
/**
* @throws ExecutionException
* @throws InterruptedException
*/
public void concurrencyTest() throws InterruptedException, ExecutionException {
Vector<Double> results = getResults();
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
}
private Vector<Double> getResults() throws InterruptedException, ExecutionException {
Vector<Future<Double>> resultsFutures = getResultsConcurrently();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
/*
* At this point, we expect that no reference to the models is held
* and the memory is freed.
* However, the Future still reference each "model".
*/
Runtime.getRuntime().gc();
System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
Vector<Double> results = new Vector<Double>(resultsFutures.size());
for(int i=0; i<resultsFutures.size(); i++) {
results.add(i, resultsFutures.get(i).get());
}
return results;
}
private Vector<Future<Double>> getResultsConcurrently() {
/*
* At this point we allocate some model, which represents
* something our workers work on.
*/
Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
for(int i=0; i<numberOfModels; i++) {
models.add(i, new ArrayList<Double>());
}
/*
* Work on the models concurrently
*/
Vector<Future<Double>> results = calculateResults(models);
/*
* Return the futures.
* Note: We expect that no more reference is held to a model
* once we are running out scope of this function AND the respective worker
* has completed.
*/
return results;
}
private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
for(int i=0; i<models.size(); i++) {
final ArrayList<Double> model = models.get(i);
final int modelNumber = i;
Callable<Double> worker = new Callable<Double>() {
public Double call() throws InterruptedException {
/*
* The models will perform some thread safe lazy init,
* which we simulate here, via the following line
*/
for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());
/*
* Now the worker starts working on the model
*/
double sum = 0.0;
for(Double value : model) sum += value.doubleValue();
Thread.sleep(1000);
Runtime.getRuntime().gc();
System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
return sum;
}
};
// The following line will add the future result of the calculation to the vector results
results.add(i, executor.submit(worker));
}
return results;
}
}
Here is a screenshot of a debugger/profiler (this was done in another example). The FutureTask has completed (as is obvious from the result). However, the FutureTask holds a reference to the Callable. In this case the Callable holds a reference to the outer final variable arguments which contains some "big" object.
(This example is more real life. Here Obba Server works on a spreadsheet using concurrent creation and processing of data - taken from a project of mine).
UPDATE:
Given the answers of allprog and sbat I like to add a few comments:
I accepted the answer of allprog, because it is an answer to the original question how to free the resources in the future. What I don't like is the dependence to an external lib in this solution, but in this case it is a good hint.
That said, my preferred solution is that of sbat and in my own answer below: to avoid referencing "bigger" objects in the callable after call() has completed. Actually, my preferred solution is to avoid an anonymous class implementing Callable. Instead I define an inner class implementing Callable, having a constructor, an receiving all references to other object via its constructor, freeing them at the end of the call() implementation.
Observe that Callable and Future do two different things – Callable is similar to Runnable, in that it encapsulates a task that is meant to run on another thread, whereas a Future is used to store a result obtained from a different thread.
Future is a base interface and defines the abstraction of an object which promises results to be available in the future while FutureTask is an implementation of the Future interface.
A Future interface provides methods to check if the computation is complete, to wait for its completion and to retrieve the results of the computation. The result is retrieved using Future's get() method when the computation has completed, and it blocks until it is completed.
Once we have an ExecutorService object, we just need to call submit(), passing our Callable as an argument. Then submit() will start the task and return a FutureTask object, which is an implementation of the Future interface.
If you're concerned about the future task holding onto references to your models, you might try replacing
final ArrayList<Double> model = models.get(i);
with
final ArrayList<ArrayList<Double>> modelList =
new ArrayList<ArrayList<Double>>();
modelList.add(models.get(i));
Then at the beginning of your task's call method, do
ArrayList<Double> model = modelList.get(0);
and at the end of it write
model = null;
modelList.clear();
At least that made an improvement in your "toy example" without forcibly clearing out the user supplied model. Whether or not this can be applied to your actual work I cannot say.
The Future
itself is a FutureTask
if you use the default ExecutionService
implementations. The FutureTask will keep a reference to the Runnable or Callable that you submitted, consequently, any resources that these allocate will be kept until the Future is not freed up.
The best approach to these kind of issues is to transform
the Future into another Future that keeps only the result and return this to the caller. This way you won't need a completion service or any additional hacking.
// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Integer(1234);
}
});
// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() {
public ListenableFuture<Integer> apply(Integer input) {
return Futures.immediateFuture(input);
}
}, service);
This effectively implements the scheme you described but lifts the hacking off your shoulders, and will allow for much higher flexibility later. This brings in Guava as a dependency but I think it worth the cost. I always use this scheme.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With