Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go: Passing functions through channels

I'm trying to rate limit the functions that I call by placing them through a queue to be accessed later. Below I have a slice of requests that I have created, and the requestHandler function processes each request at a certain rate.

I want it to accept all kinds of functions with different types of parameters, hence the interface{} type.

How would I be able to pass the functions through a channel and successfully call them?

type request struct {
    function interface{}
    channel  chan interface{}
}

var requestQueue []request

func pushQueue(f interface{}, ch chan interface{}) {
    req := request{
        f,
        ch,
    }

    //push
    requestQueue = append(requestQueue, req)
}

func requestHandler() {
    for {
        if len(requestQueue) > 0 {
            //pop
            req := requestQueue[len(requestQueue)-1]
            requestQueue = requestQueue[:len(requestQueue)-1]

            req.channel <- req.function
        }

        <-time.After(1200 * time.Millisecond)
    }
}

Here is an example of what I'm trying to achieve (GetLeagueEntries(string, string) and GetSummonerName(int, int) are functions):

ch := make(chan interface{})
    pushQueue(l.GetLeagueEntries, ch)
    pushQueue(l.GetSummonerName, ch)

    leagues, _ := <-ch(string1, string2)
    summoners, _ := <-ch(int1, int2)
like image 421
tyuo9980 Avatar asked Jan 07 '16 06:01

tyuo9980


People also ask

Can you pass functions in Go?

Passing function to other functions Variables can refer to functions, and variables can be passed to functions, which means Go allows you to pass functions to other functions.

How do Go channels work?

In Golang, or Go, channels are a means through which different goroutines communicate. Think of them as pipes through which you can connect with different concurrent goroutines. The communication is bidirectional by default, meaning that you can send and receive values from the same channel.

Are channels passed by reference Golang?

Are channels implicitly passed by reference in go ? Yes, the reference types in Go are slice , map and channel .

Can multiple Goroutines use the same channel?

A channel can be thought of like a pipe between two or more different goroutines that data can be sent through. One goroutine puts data into one end of the pipe and another goroutine gets that same data out.


2 Answers

Alright, here is the codez: https://play.golang.org/p/XZvb_4BaJF

Notice that it's not perfect. You have a queue that is executed every second. If the queue is empty and a new item is added, the new item can wait for almost a second before being executed.

But this should get you very close to what you need anyway :)

This code can be split into 3 section:

  1. The rate limited queue executor, which I call the server (I'm horrible at naming things) - The server doesn't know anything about the functions. All it does is start a never-ending goroutine that pops the oldest function in the queue, once every second, and calls it. The issue that I talked about above is in this section of the code BTW and I could help you fix it if you want.
  2. The Button Click functionality - This shows you how each button click could call 3 diff functions (you could obviously make more/less function calls) using the server and make sure that they are each 1 second apart from each other. You can even add a timeout to any of the functions (to fake latency) and they would still get called 1 second apart. This is the only place that you need channels because you want to make all the function calls as fast as possible (if the first function takes 5 seconds, you only want to wait 1 second to call the second function) and then wait for them to finish so you need to know when they are all done.
  3. The Button Click simulation (the main func) - this just shows that 3 button clicks would work as expected. You can also put them in a goroutine to simulate 3 users clicking the button at the same time and it would still work.

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    const (
        requestFreq = time.Second
    )
    
    type (
        // A single request
        request func()
    
        // The server that will hold a queue of requests and make them once a requestFreq
        server struct {
            // This will tick once per requestFreq
            ticker     *time.Ticker
    
            requests []request
            // Mutex for working with the request slice
            sync.RWMutex
        }
    )
    
    var (
        createServerOnce sync.Once
        s *server
    )
    
    func main() {
        // Multiple button clicks:
        ButtonClick()
        ButtonClick()
        ButtonClick()
    
        fmt.Println("Done!")
    }
    
    
    
    
    
    
    // BUTTON LOGIC:
    
    // Calls 3 functions and returns 3 diff values.
    // Each function is called at least 1 second appart.
    func ButtonClick() (val1 int, val2 string, val3 bool) {
        iCh := make(chan int)
        sCh := make(chan string)
        bCh := make(chan bool)
    
        go func(){
            Server().AppendRequest(func() {
                t := time.Now()
                fmt.Println("Calling func1 (time: " + t.Format("15:04:05") + ")")
                // do some stuff
                iCh <- 1
            })
        }()
        go func(){
            Server().AppendRequest(func() {
                t := time.Now()
                fmt.Println("Calling func2 (time: " + t.Format("15:04:05") + ")")
                // do some stuff
                sCh <- "Yo"
            })
        }()
        go func(){
            Server().AppendRequest(func() {
                t := time.Now()
                fmt.Println("Calling func3 (time: " + t.Format("15:04:05") + ")")
                // do some stuff
                bCh <- true
            })
        }()
    
        // Wait for all 3 calls to come back
        for count := 0; count < 3; count++ {
            select {
            case val1 = <-iCh:
            case val2 = <-sCh:
            case val3 = <-bCh:
            }
        }
    
        return
    }
    
    
    
    
    
    // SERVER LOGIC
    
    // Factory function that will only create a single server
    func Server() *server {
        // Only one server for the entire application
        createServerOnce.Do(func() {
            s = &server{ticker: time.NewTicker(requestFreq), requests: []request{}}
    
            // Start a thread to make requests.
            go s.makeRequests()
        })
        return s
    }
    func (s *server) makeRequests() {
        if s == nil || s.ticker == nil {
            return
        }
    
        // This will keep going once per each requestFreq
        for _ = range s.ticker.C {
    
            var r request
    
            // You can't just access s.requests because you are in a goroutine
            // here while someone could be adding new requests outside of the 
            // goroutine so you have to use locks.
            s.Lock()
            if len(s.requests) > 0 {
                // We have a lock here, which blocks all other operations 
                // so just shift the first request out, save it and give 
                // the lock back before doing any work.
                r = s.requests[0]
                s.requests = s.requests[1:]
            }
            s.Unlock()
    
            if r != nil {
                // make the request!
                r()
            }
        }
    }
    func (s *server) AppendRequest(r request) {
        if s == nil {
            return
        }
        s.Lock()
        s.requests = append(s.requests, r)
        s.Unlock()
    }
    
like image 111
Max Kerper Avatar answered Oct 19 '22 23:10

Max Kerper


First, I would write it as:

leagues := server.GetLeagueEntries()
summoners := server.GetSummoners()

And, put the rate limiting into the server. With one of the rate-limiting libraries.

However, it is possible to use an interface to unify the requests, and use a func type to allow closures (as in http.HandleFunc):

type Command interface {
    Execute(server *Server)
}

type CommandFunc func(server *Server)
func (fn CommandFunc) Execute(server *Server) { fn(server) }

type GetLeagueEntries struct { Leagues []League }

func (entries *GetLeagueEntries) Execute(server *Server) {
    // ...
}

func GetSummonerName(id int, result *string) CommandFunc {
    return CommandFunc(func(server *Server){
        *result = "hello"
    })
}

get := GetLeagueEnties{}
requests <- &get

requests <- CommandFunc(func(server *Server){
    // ... handle struff here
})

Of course, this needs some synchronization.

like image 2
Egon Avatar answered Oct 19 '22 23:10

Egon