Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Freeing up Resources in Completion of Java Future - Here: Reference to Callable and its outer Variables

In my project I frequently work with concurrent tasks using Java Futures. In one application, each concurrent task is requiring quite big chunk of memory during its completion. Due to some other design choices, that memory is created and referenced in an object created outside the thread (see more detailed example below).

To my surprise, the future holds a reference to this object, even after the future task (i.e., its calculation thread) has been completed. That is: if no other reference to this object is held elsewhere the object will not be freed, unless the future is freed - even though the task has completed.

My naive thinking was that limiting the number of concurrent threads will automatically limit the number of resources (memory) held by the tasks. THIS IS NOT TRUE!

Consider the code below. In this example I create some task. During their calculation an ArrayList (which is an outer variable) grows in size. The method returns a Vector<Future>. Even if the task has completed and even if the scope of the ArrayList has been left, the Future still holds a reference to the ArrayList (via FutureTask.sync.callable).

To summarize:

  • The FutureTask holds a reference to the Callable, even if the Callable has completed.
  • The Callable holds references to the final outer variables used during calculation, even if the calculation has completed.

Question: What is the best way to free resources held via the Future? (Of course, I know that local variables of the callable are released upon thread completion - this is not what I am asking for).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
 *
 * Created on 17.08.2013
 */

package net.finmath.experiments.concurrency;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author Christian Fries
 *
 */
public class ConcurrencyTest {

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private int numberOfDoubles = 1024*1024/8;      // 1 MB
    private int numberOfModels  = 100;              // 100 * 1 MB

    /**
     * @param args
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ConcurrencyTest ct = new ConcurrencyTest();
        ct.concurrencyTest();
    }

    /**
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public void concurrencyTest() throws InterruptedException, ExecutionException {
        Vector<Double> results = getResults();

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
    }


    private Vector<Double> getResults() throws InterruptedException, ExecutionException {
        Vector<Future<Double>> resultsFutures = getResultsConcurrently();


        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);

        /*
         * At this point, we expect that no reference to the models is held
         * and the memory is freed.
         * However, the Future still reference each "model". 
         */

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

        Vector<Double> results = new Vector<Double>(resultsFutures.size());
        for(int i=0; i<resultsFutures.size(); i++) {
            results.add(i, resultsFutures.get(i).get());
        }       

        return results;
    }

    private Vector<Future<Double>> getResultsConcurrently() {

        /*
         * At this point we allocate some model, which represents
         * something our workers work on.
         */
        Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
        for(int i=0; i<numberOfModels; i++) {
            models.add(i, new ArrayList<Double>());
        }

        /*
         * Work on the models concurrently
         */
        Vector<Future<Double>> results = calculateResults(models);


        /*
         * Return the futures.
         * Note: We expect that no more reference is held to a model
         * once we are running out scope of this function AND the respective worker
         * has completed.
         */
        return results;
    }

    private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
        Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
        for(int i=0; i<models.size(); i++) {
            final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;

            Callable<Double> worker = new  Callable<Double>() {
                public Double call() throws InterruptedException {

                    /*
                     * The models will perform some thread safe lazy init,
                     * which we simulate here, via the following line
                     */
                    for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());

                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
                    for(Double value : model) sum += value.doubleValue();

                    Thread.sleep(1000);

                    Runtime.getRuntime().gc();
                    System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

                    return sum;
                }
            };

            // The following line will add the future result of the calculation to the vector results               
            results.add(i, executor.submit(worker));
        }

        return results;
    }
}

Here is a screenshot of a debugger/profiler (this was done in another example). The FutureTask has completed (as is obvious from the result). However, the FutureTask holds a reference to the Callable. In this case the Callable holds a reference to the outer final variable arguments which contains some "big" object.

enter image description here

(This example is more real life. Here Obba Server works on a spreadsheet using concurrent creation and processing of data - taken from a project of mine).

UPDATE:

Given the answers of allprog and sbat I like to add a few comments:

  • I accepted the answer of allprog, because it is an answer to the original question how to free the resources in the future. What I don't like is the dependence to an external lib in this solution, but in this case it is a good hint.

  • That said, my preferred solution is that of sbat and in my own answer below: to avoid referencing "bigger" objects in the callable after call() has completed. Actually, my preferred solution is to avoid an anonymous class implementing Callable. Instead I define an inner class implementing Callable, having a constructor, an receiving all references to other object via its constructor, freeing them at the end of the call() implementation.

like image 270
Christian Fries Avatar asked Aug 17 '13 20:08

Christian Fries


People also ask

What is Callable and Future in Java?

Observe that Callable and Future do two different things – Callable is similar to Runnable, in that it encapsulates a task that is meant to run on another thread, whereas a Future is used to store a result obtained from a different thread.

What is Future and FutureTask in Java?

Future is a base interface and defines the abstraction of an object which promises results to be available in the future while FutureTask is an implementation of the Future interface.

Does Future get () block?

A Future interface provides methods to check if the computation is complete, to wait for its completion and to retrieve the results of the computation. The result is retrieved using Future's get() method when the computation has completed, and it blocks until it is completed.

How do you use Future with ExecutorService?

Once we have an ExecutorService object, we just need to call submit(), passing our Callable as an argument. Then submit() will start the task and return a FutureTask object, which is an implementation of the Future interface.


2 Answers

If you're concerned about the future task holding onto references to your models, you might try replacing

final ArrayList<Double> model = models.get(i);

with

final ArrayList<ArrayList<Double>> modelList = 
            new ArrayList<ArrayList<Double>>();
modelList.add(models.get(i));

Then at the beginning of your task's call method, do

ArrayList<Double> model = modelList.get(0);

and at the end of it write

model = null;
modelList.clear();

At least that made an improvement in your "toy example" without forcibly clearing out the user supplied model. Whether or not this can be applied to your actual work I cannot say.

like image 123
sgbj Avatar answered Sep 28 '22 00:09

sgbj


The Future itself is a FutureTask if you use the default ExecutionService implementations. The FutureTask will keep a reference to the Runnable or Callable that you submitted, consequently, any resources that these allocate will be kept until the Future is not freed up.

The best approach to these kind of issues is to transform the Future into another Future that keeps only the result and return this to the caller. This way you won't need a completion service or any additional hacking.

// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        return new Integer(1234);
    }
});

// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the 
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() {
    public ListenableFuture<Integer> apply(Integer input) {
        return Futures.immediateFuture(input);
    }
}, service);

This effectively implements the scheme you described but lifts the hacking off your shoulders, and will allow for much higher flexibility later. This brings in Guava as a dependency but I think it worth the cost. I always use this scheme.

like image 40
allprog Avatar answered Sep 28 '22 01:09

allprog