I've encountered this problem in my real-life project and proved by my testing code and profiler. Instead of pasting "tl;dr" code, I'm showing you a picture and then describe it.
Simply put, I'm using Future.firstCompletedOf
to get a result from 2 Future
s, both of which have no shared things and don't care about each other. Even though, which is the question I want to address, the Garbage Collector cannot recycle the first Result
object until both of the Future
s finished.
So I'm really curious about the mechanism behind this. Could someone explain it from a lower level, or provide some hint for me to look into.
Thanks!
PS: is it because they share the same ExecutionContext
?
** Update ** paste test code as requested
object Main extends App{
println("Test start")
val timeout = 30000
trait Result {
val id: Int
val str = "I'm short"
}
class BigObject(val id: Int) extends Result{
override val str = "really big str"
}
def guardian = Future({
Thread.sleep(timeout)
new Result { val id = 99999 }
})
def worker(i: Int) = Future({
Thread.sleep(100)
new BigObject(i)
})
for (i <- Range(1, 1000)){
println("round " + i)
Thread.sleep(20)
Future.firstCompletedOf(Seq(
guardian,
worker(i)
)).map( r => println("result" + r.id))
}
while (true){
Thread.sleep(2000)
}
}
As long as an object is being referenced, the JVM considers it alive. Once an object is no longer referenced and therefore is not reachable by the application code, the garbage collector removes it and reclaims the unused memory.
An object is eligible to be garbage collected if its reference variable is lost from the program during execution. Sometimes they are also called unreachable objects. What is reference of an object? The new operator dynamically allocates memory for an object and returns a reference to it.
When the garbage collector performs a collection, it releases the memory for objects that are no longer being used by the application. It determines which objects are no longer being used by examining the application's roots.
When the JVM doesn't have necessary memory space to run, the garbage collector will run and delete unnecessary objects to free up memory. Unnecessary objects are the objects which have no other references (address) pointing to them. There are mainly 4 ways an object can eligible for garbage collection.
Let's see how firstCompletedOf
is implemented:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Try[T] => Unit = p tryComplete _
futures foreach { _ onComplete completeFirst }
p.future
}
When doing { futures foreach { _ onComplete completeFirst }
, the function completeFirst
is saved somewhere
via ExecutionContext.execute
. Where exactly is this function saved is irrelevant, we just know that it has to be saved somewhere
so that it can be picked later on and executed on a thread pool when a thread becomes available. Only when the future has completed is the reference to completeFirst
not needed anymore.
Because completeFirst
closes over p
, as long as there is still one future (from futures
) waiting to be completed there is a reference to p
that prevents it to be garbage collected (even though by that point chances are that firstCompletedOf
has already returned, removing p
from the stack).
When the first future completes, it saves the result into the promise (by calling p.tryComplete
).
Because the promise p
holds the result, the result is reachable for at least as long as p
is reachable, and as we saw p
is reachable as long as at least one future from futures
has not completed.
This is the reason why the result cannot be collected before all the futures have completed.
UPDATE: Now the question is: could it be fixed? I think it could. All we would have to do is to ensure that the first future to complete "nulls out" the reference to p in a thread-safe way, which can be done by example using an AtomicReference. Something like this:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst: Try[T] => Unit = { result: Try[T] =>
val promise = pref.getAndSet(null)
if (promise != null) {
promise.tryComplete(result)
}
}
futures foreach { _ onComplete completeFirst }
p.future
}
I have tested it and as expected it does allow the result to be garbage collected as soon as the first future completes. It should behave the same in all other respects.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With