Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a counter when using golang's goroutine?

Tags:

go

I'm trying to make a queue struct that have push and pop functions.

I need to use 10 threads push and another 10 threads pop data, just like I did in the code below.

Questions:

  1. I need to print out how much I have pushed/popped, but I don't know how to do that.
  2. Is there anyway to speed up my code? The code is too slow for me.
package main

import (
    "runtime"
    "time"
)

const (
    DATA_SIZE_PER_THREAD = 10000000
)

type Queue struct {
    records string
}


func (self Queue) push(record chan interface{}) {
    // need push counter
    record <- time.Now()
}

func (self Queue) pop(record chan interface{}) {
    // need pop counter
    <- record
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    //record chan
    record := make(chan interface{},1000000)
    //finish flag chan
    finish := make(chan bool)
    queue := new(Queue)
    for i:=0; i<10; i++ {
        go func() {
            for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
                queue.push(record)
            }
            finish<-true
        }()
    }
    for i:=0; i<10; i++ {
        go func() {
            for j:=0; j<DATA_SIZE_PER_THREAD; j++ {
                queue.pop(record)
            }
            finish<-true
        }()
    }
    for i:=0; i<20; i++ {
        <-finish
    }
}
like image 706
WoooHaaaa Avatar asked Sep 04 '12 14:09

WoooHaaaa


1 Answers

There are a few things you should fix.

  • The methods on the Queue type should have pointer receivers. Otherwise, every method call will create a copy of the current queue type and any changes to queue fields will not persist beyond the method call itself.

  • Waiting for all routines to finish, can be done using a sync.WaitGroup. This is specifically what it was designed for.

  • Maintaining a thread-safe push/pop counter inside the queue type can be done by using the sync/atomic package.

As far as speed goes, from your example, I am not quite sure what you are trying to achieve. Any optimizations might come up if you elaborate on that a little.

Here is an example I modified from your code:

package main

import (
    "log"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

const SizePerThread = 10000000

type Queue struct {
    records string
    count   int64
}

func (q *Queue) push(record chan interface{}) {
    record <- time.Now()

    newcount := atomic.AddInt64(&q.count, 1)
    log.Printf("Push: %d", newcount)
}

func (q *Queue) pop(record chan interface{}) {
    <-record

    newcount := atomic.AddInt64(&q.count, -1)
    log.Printf("Pop: %d", newcount)
}

func main() {
    var wg sync.WaitGroup

    runtime.GOMAXPROCS(runtime.NumCPU())

    record := make(chan interface{}, 1000000)
    queue := new(Queue)

    // We are launching 20 goroutines.
    // Let the waitgroup know it should wait for as many
    // of them to finish.
    wg.Add(20)

    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()

            for j := 0; j < SizePerThread; j++ {
                queue.push(record)
            }
        }()

        go func() {
            defer wg.Done()

            for j := 0; j < SizePerThread; j++ {
                queue.pop(record)
            }
        }()
    }

    // Wait for all goroutines to finish.
    wg.Wait()
}
like image 92
jimt Avatar answered Oct 03 '22 08:10

jimt