Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go, How do I pull X messages from a channel at a time

Tags:

go

I have a channel with incoming messages and a go routine that waits on it I process these messages and send them to a different server

I would like to either process 100 messages at a time if they are ready, or after say 5 seconds process what ever is in there and go wait again

How do I do that in Go

like image 438
Unenumrated Avatar asked Dec 31 '13 10:12

Unenumrated


Video Answer


1 Answers

The routine you use to read from the message channel should define a cache in which incoming messages are stored. These cached messages are then sent to the remote server in bulk either when the cache reaches 100 messages, or 5 seconds have passed. You use a timer channel and Go's select statement to determine which one occurs first.

The following example can be run on the Go playground

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Message int

const (
    CacheLimit   = 100
    CacheTimeout = 5 * time.Second
)

func main() {
    input := make(chan Message, CacheLimit)

    go poll(input)
    generate(input)
}

// poll checks for incoming messages and caches them internally
// until either a maximum amount is reached, or a timeout occurs.
func poll(input <-chan Message) {
    cache := make([]Message, 0, CacheLimit)
    tick := time.NewTicker(CacheTimeout)

    for {
        select {
        // Check if a new messages is available.
        // If so, store it and check if the cache
        // has exceeded its size limit.
        case m := <-input:
            cache = append(cache, m)

            if len(cache) < CacheLimit {
                break
            }

            // Reset the timeout ticker.
            // Otherwise we will get too many sends.
            tick.Stop()

            // Send the cached messages and reset the cache.
            send(cache)
            cache = cache[:0]

            // Recreate the ticker, so the timeout trigger
            // remains consistent.
            tick = time.NewTicker(CacheTimeout)

        // If the timeout is reached, send the
        // current message cache, regardless of
        // its size.
        case <-tick.C:
            send(cache)
            cache = cache[:0]
        }
    }
}

// send sends cached messages to a remote server.
func send(cache []Message) {
    if len(cache) == 0 {
        return // Nothing to do here.
    }

    fmt.Printf("%d message(s) pending\n", len(cache))
}

// generate creates some random messages and pushes them into the given channel.
//
// Not part of the solution. This just simulates whatever you use to create
// the messages by creating a new message at random time intervals.
func generate(input chan<- Message) {
    for {
        select {
        case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
            input <- Message(rand.Int())
        }
    }
}
like image 73
jimt Avatar answered Nov 24 '22 02:11

jimt