Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read-write exclusion with channels

Tags:

concurrency

go

I would like to write a small in-memory database in Go. Read and write requests would be passed through a channel and processed by the db engine which would ensure the accesses are done properly.

A first idea woud be to mimic the behaviour of RWMutex. Only it would use a more idiomatic go style.

Here is a little toy (although, rather long) example of what I would like to do.

package main

import (
    "log"
    "math/rand"
    "time"
)

var source *rand.Rand

type ReqType int

const (
    READ = iota
    WRITE
)

type DbRequest struct {
    Type  int              // request type
    RespC chan *DbResponse // channel for request response
    // content here
}

type DbResponse struct {
    // response here
}

type Db struct {
    // DB here
}

func randomWait() {
    time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
}

func (d *Db) readsHandler(in <-chan *DbRequest) {
    for r := range in {
        id := source.Intn(4000000)
        log.Println("read ", id, " starts")
        randomWait()
        log.Println("read ", id, " ends")
        r.RespC <- &DbResponse{}
    }
}

func (d *Db) writesHandler(r *DbRequest) *DbResponse {
    id := source.Intn(4000000)
    log.Println("write ", id, " starts")
    randomWait()
    log.Println("write ", id, " ends")
    return &DbResponse{}
}

func (d *Db) Start(nReaders int) chan *DbRequest {
    in := make(chan *DbRequest, 100)
    reads := make(chan *DbRequest, nReaders)

    // launch readers
    for k := 0; k < nReaders; k++ {
        go d.readsHandler(reads)
    }

    go func() {
        for r := range in {
            switch r.Type {
            case READ:
                reads <- r
            case WRITE:
                // here we should wait for all reads to
                // be over (how ??)

                r.RespC <- d.writesHandler(r)

                // here writesHandler is blocking,
                // this ensures that no additional
                // read is added in the reads channel
                // before the write is finished
            }
        }
    }()

    return in
}

func main() {
    seed := time.Now().Unix()
    source = rand.New(rand.NewSource(seed))

    blackhole := make(chan *DbResponse, 100)

    d := Db{}
    rc := d.Start(4)
    wc := time.After(3 * time.Second)

    go func() {
        for {
            <-blackhole
        }
    }()

    for {
        select {
        case <-wc:
            return
        default:
            if source.Intn(2) == 0 {
                rc <- &DbRequest{READ, blackhole}
            } else {
                rc <- &DbRequest{WRITE, blackhole}
            }
        }
    }
}

Of course, this example shows read/write conflicts.

I feel like I'm trying to do something a bit evil: sharing memory using constructs designed to avoid it... At this point, an obvious solution would be to add RWMutex locks around the two types of requests handling but maybe there is a clever solution using only goroutines and channels.

like image 385
lbonn Avatar asked Dec 27 '12 07:12

lbonn


People also ask

What are channels and Goroutines?

If goroutines are the activities of a concurrent Go program, channels are the connections between them. A channel is a communication mechanism that enables one goroutine to send values to another goroutine. Each channel is a conduit for values of a particular type, called the channel's element type.

Are Golang channels blocking?

Channel operation (i.e. write or read) are blocking in nature. This means: When we send data into the channel using a GoRoutine, it will be blocked until the data is consumed by another GoRoutine. When we receive data from channel using a GoRoutine, it will be blocked until the data is available in the channel.

What are channels used for in Go?

Go channels are used for communicating between concurrently running functions by sending and receiving a specific element type's data. When we have numerous Goroutines running at the same time, channels are the most convenient way for them to communicate with one another.


1 Answers

Why not just use RWMutex? It's been optimized to be very efficient and it's conceptually simple. Just embed one in your Db object

type Db struct {
    sync.RWMutex
    // DB here
}

and you can call it like

db := &Db{}
...
db.Lock()
// do RW operations
db.Unlock()
...
db.RLock()
// do Read operations
db.RUnlock()

I don't know a way to get better performance using channels. You can however get better performance with lock-free techniques, but I recommend getting your RWMutex version running first.

Another concurrency issue is that fmt package writes to stdout are not thread safe and you will eventually see garbled output. Try the log package instead. You can set it to write to stdout with no logging prefix and it will ensure atomic writes.

like image 140
Sonia Avatar answered Oct 19 '22 19:10

Sonia