Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Golang Memory Leak Concerning Goroutines

I have a Go program that runs continuously and relies entirely on goroutines + 1 manager thread. The main thread simply calls goroutines and otherwise sleeps.

There is a memory leak. The program uses more and more memory until it drains all 16GB RAM + 32GB SWAP and then each goroutine panics. It is actually OS memory that causes the panic, usually the panic is fork/exec ./anotherapp: cannot allocate memory when I try to execute anotherapp.

When this happens all of the worker threads will panic and be recovered and restarted. So each goroutine will panic, be recovered and restarted... at which point the memory usage will not decrease, it remains at 48GB even though there is now virtually nothing allocated. This means all goroutines will always panic as there is never enough memory, until the entire executable is killed and restarted completely.

The entire thing is about 50,000 lines, but the actual problematic area is as follows:

type queue struct {
    identifier string
    type bool
}

func main() {

    // Set number of gorountines that can be run
    var xthreads int32 = 10
    var usedthreads int32
    runtime.GOMAXPROCS(14)
    ready := make(chan *queue, 5)

    // Start the manager goroutine, which prepared identifiers in the background ready for processing, always with 5 waiting to go
    go manager(ready)

    // Start creating goroutines to process as they are ready
    for obj := range ready { // loops through "ready" channel and waits when there is nothing

        // This section uses atomic instead of a blocking channel in an earlier attempt to stop the memory leak, but it didn't work
        for atomic.LoadInt32(&usedthreads) >= xthreads {
            time.Sleep(time.Second)
        }
        debug.FreeOSMemory() // Try to clean up the memory, also did not stop the leak
        atomic.AddInt32(&usedthreads, 1) // Mark goroutine as started

        // Unleak obj, probably unnecessary, but just to be safe
        copy := new(queue)
        copy.identifier = unleak.String(obj.identifier) // unleak is a 3rd party package that makes a copy of the string
        copy.type = obj.type
        go runit(copy, &usedthreads) // Start the processing thread

    }

    fmt.Println(`END`) // This should never happen as the channels are never closed
}

func manager(ready chan *queue) {
    // This thread communicates with another server and fills the "ready" channel
}

// This is the goroutine
func runit(obj *queue, threadcount *int32) {
    defer func() {
        if r := recover(); r != nil {
            // Panicked
            erstring := fmt.Sprint(r)
            reportFatal(obj.identifier, erstring)
        } else {
            // Completed successfully
            reportDone(obj.identifier)
        }
        atomic.AddInt32(threadcount, -1) // Mark goroutine as finished
    }()
    do(obj) // This function does the actual processing
}

As far as I can see, when the do function (last line) ends, either by having finished or having panicked, the runit function then ends, which ends the goroutine entirely, which means all of the memory from that goroutine should now be free. This is now what happens. What happens is that this app just uses more and more and more memory until it becomes unable to function, all the runit goroutines panic, and yet the memory does not decrease.

Profiling does not reveal anything suspicious. The leak appears to be outside of the profiler's scope.

like image 522
Alasdair Avatar asked Feb 04 '15 09:02

Alasdair


1 Answers

Please consider inverting the pattern, see here or below....

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

// I do work
func worker(id int, work chan int) {
    for i := range work {
        // Work simulation
        log.Printf("Worker %d, sleeping for %d seconds\n", id, i)
        time.Sleep(time.Duration(rand.Intn(i)) * time.Second)
    }
}

// Return some fake work
func getWork() int {
    return rand.Intn(2) + 1
}

func main() {
    wg := new(sync.WaitGroup)
    work := make(chan int)

    // run 10 workers
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            worker(i, work)
            wg.Done()
        }(i)
    }

    // main "thread"
    for i := 0; i < 100; i++ {
        work <- getWork()
    }

    // signal there is no more work to be done
    close(work)

    // Wait for the workers to exit
    wg.Wait()
}
like image 144
freeformz Avatar answered Sep 18 '22 17:09

freeformz