Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Simple concurrent queue

Tags:

go

Could someone please mention the flaws and performance drawbacks in the Queue like implementation?

type Queue struct {
    sync.Mutex
    Items []interface{}
}

func (q *Queue) Push(item interface{}) {
    q.Lock()
    defer q.Unlock()
    q.Items = append(q.Items, item)
}

func (q *Queue) Pop() interface{} {
    q.Lock()
    defer q.Unlock()
    if len(q.Items) == 0 {
        return nil
    }
    item := q.Items[0]
    q.Items = q.Items[1:]
    return item
}

I also have methods like PopMany and PushMany, and what I am concerned about is: Is too much re-slicing that bad?

like image 733
FPGA Avatar asked May 25 '26 13:05

FPGA


2 Answers

You could simply use a buffered channel.

var queue = make(chan interface{}, 100)

The size of the buffer could to be determined empirically to be large enough for the high-water mark for the rate of pushes vs rate of pops. It should ideally not be much larger than this, to avoid wasting memory.

Indeed, a smaller buffer size will also work, provided the interacting goroutines don't deadlock for other reasons. If you use a smaller buffer size, you are effectively getting queueing via the run-queue of the goroutine time-slice engine, part of the Go runtime. (Quite possible, a buffer size of zero could work in many circumstances.)

Channels allow many reader goroutines and many writer goroutines. The concurrency of their access is handled automatically by the Go runtime. All writes into the channel are interleaved so as to be a sequential stream. All the reads are also interleaved to extract values sequentially in the same order they were enqueued. Here's further discussion on this topic.

like image 178
Rick-777 Avatar answered May 28 '26 10:05

Rick-777


The re-slicing is not an issue here. It will also make no difference whether you have a thread-safe or unsafe version as this is pretty much how the re-sizing is meant to be done.

You can alleviate some of the re-sizing overhead by initializing the queue with a capacity:

func NewQueue(capacity int) *Queue {
    return &Queue {
        Items: make([]interface{}, 0, capacity),
    }
}

This will initialize the queue. It can still grow beyond the capacity, but you will not be having any unnecessary copying/re-allocation until that capacity is reached.

What may potentially cause problems with many concurrent accesses, is the mutex lock. At some point, you will be spending more time waiting for locks to be released than you are actually doing work. This is a general problem with lock contention and can be solved by implementing the queue as a lock-free data structure.

There are a few third-party packages out there which provide lock free implementations of basic data structures.

Whether this will actually be useful to you can only be determined with some benchmarking. Lock-free structures can have a higher base cost, but they scale much better when you get many concurrent users. There is a cutoff point at which mutex locks become more expensive than the lock-free approach.

like image 25
jimt Avatar answered May 28 '26 09:05

jimt



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!