Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fire and forget goroutine golang

I have written an API that makes DB calls and does some business logic. I am invoking a goroutine that must perform some operation in the background. Since the API call should not wait for this background task to finish, I am returning 200 OK immediately after calling the goroutine (let us assume the background task will never give any error.)

I read that goroutine will be terminated once the goroutine has completed its task. Is this fire and forget way safe from a goroutine leak? Are goroutines terminated and cleaned up once they perform the job?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    go func() {
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
}
like image 695
Anish Avatar asked Dec 17 '22 11:12

Anish


1 Answers

I would recommend always having your goroutines under control to avoid memory and system exhaustion. If you are receiving a spike of requests and you start spawning goroutines without control, probably the system will go down soon or later.

In those cases where you need to return an immediate 200Ok the best approach is to create a message queue, so the server only needs to create a job in the queue and return the ok and forget. The rest will be handled by a consumer asynchronously.

Producer (HTTP server) >>> Queue >>> Consumer

Normally, the queue is an external resource (RabbitMQ, AWS SQS...) but for teaching purposes, you can achieve the same effect using a channel as a message queue.

In the example you'll see how we create a channel to communicate 2 processes. Then we start the worker process that will read from the channel and later the server with a handler that will write to the channel.

Try to play with the buffer size and job time while sending curl requests.

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

    queueSize := 10
    // This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
    myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'

    // Starts a worker that will read continuously from our queue
    go myBackgroundWorker(myJobQueue)

    // We start our server with a handler that is receiving the queue to write to it
    if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
        panic(err)
    }
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // We check that in the query string we have a 'user_id' query param
        if userID := r.URL.Query().Get("user_id"); userID != "" {
            select {
            case myJobQueue <- userID: // We try to put the item into the queue ...
                rw.WriteHeader(http.StatusOK)
                rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
            default: // If we cannot write to the queue it's because is full!
                rw.WriteHeader(http.StatusInternalServerError)
                rw.Write([]byte(`our internal queue is full, try it later`))
            }
            return
        }
        rw.WriteHeader(http.StatusBadRequest)
        rw.Write([]byte(`missing 'user_id' in query params`))
    }
}

func myBackgroundWorker(myJobQueue <-chan string) {
    const (
        jobDuration = 10 * time.Second // simulation of a heavy background process
    )

    // We continuosly read from our queue and process the queue 1 by 1.
    // In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
    for userID := range myJobQueue {
        // rate limiter here ...
        // go func(u string){
        log.Printf("processing user: %s, started", userID)
        time.Sleep(jobDuration)
        log.Printf("processing user: %s, finisehd", userID)
        // }(userID)
    }
}

like image 144
wakumaku Avatar answered Jan 11 '23 18:01

wakumaku