Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel saxpy implementation in Go isn't scaling well across cores

So I'm trying to implement an implementation of saxpy that is both blocked and can be computed in parallel using the 8-cores available on my machine. I started with the assumption that small sizes of the vectors x and y which fit into the L1 cache of my machine (split 256kB - 128kB data, 128kB code), can be computed in serial. To test this assumption, I wrote two implementations of saxpy, one which is a blocked serial version of saxpy (BSS) and a blocked parallel version of saxpy (BPS). The blocking algorithm is used only when the sizes of the vectors are larger than 4096 elements long. The following are the implementations:

const cachecap = 32*1024/8 // 4096
func blocked_serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    //fmt.Println("zn: ", zn)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    //fmt.Println("nblocks: ", nblocks)
    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        //fmt.Println("beg, end: ", beg, end)
        xb := x[beg:end]
        yb := y[beg:end]
        zb := z[beg:end]
        serial_saxpy(a, xb, incx, b, yb, incy, zb, incz)
    }
}
func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        //fmt.Println("zn <= cachecap: using serial_saxpy")
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    //fmt.Println("nblocks: ", nblocks)
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }
    //fmt.Println("nworkers: ", nworkers)

    //buf := blockSize*nworkers
    //if buf > nblocks {
    //  buf = nblocks
    //}
    //sendchan := make(chan block, buf)
    sendchan := make(chan block, nblocks)

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            a, b := a, b
            incx, incy, incz := incx, incy, incz
            for blk := range sendchan {
                beg, end := blk.beg, blk.end
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }()
    }

    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        //fmt.Println("beg:end", beg, end)
        sendchan <- block{beg, end}
    }
    close(sendchan)
    wg.Wait()
}

func serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    if incx <= 0 || incy <= 0 || incz <= 0 {
        panic("AxpBy: zero or negative increments not supported")
    }

    n := len(z) / incz
    if incx == 1 && incy == 1 && incz == 1 {
        if a == 1 && b == 1 {
            for i := 0; i < n; i++ {
                z[i] = x[i] + y[i]
            }
            return
        }

        if a == 0 && b == 1 {
            copy(z, y)
            //for i := 0; i < n; i++ {
            //  z[i] = y[i]
            //}
            return
        }

        if a == 1 && b == 0 {
            copy(z, x)
            //for i := 0; i < n; i++ {
            //  z[i] = x[i]
            //}
            return
        }

        if a == 0 && b == 0 {
            return
        }

        for i := 0; i < n; i++ {
            z[i] = a*x[i] + b*y[i]
        }
        return
    }

    // unequal increments or equal increments != 1
    ix, iy, iz := 0, 0, 0
    if a == 1 && b == 1 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = x[ix] + y[iy]
        }
        return
    }

    if a == 0 && b == 1 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = y[iy]
        }
        return
    }

    if a == 1 && b == 0 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = x[ix]
        }
        return
    }

    if a == 0 && b == 0 {
        return
    }

    for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
        z[iz] = a*x[ix] + b*y[iy]
    }
}

I then wrote benchmarks for the three functions blocked_serial_saxpy, blocked_parallel_saxpy and serial_saxpy. The following image shows the results of the benchmarks with vector sizes 1e3, 1e4, 1e5, 2e5, 3e5, 4e5, 6e5, 8e5 and 1e6 respectively: Saxpy Benchmarks Part 1Saxpy Benchmarks Part 2

To help me visualize the performance of the blocked_parallel_saxpy implementation, I plotted the results and this is what I obtained: Saxpy Performance Plot Looking at the plot, makes me wonder, why am I not seeing a parallel speedup, when all the CPUs are being used and at 100% during the blocked_parallel_saxpy benchmark. The image from task manager is below: Saxpy CPU Usage

Could someone help me understand what's going on here? Is what I'm seeing, symptom of a problem or the way it should be? If it's the former, is there a way to fix it?

Edit: I have modified the blocked_parallel_saxpy code to the following. I dividing the total no.of blocks (nblocks) such that there are nworker goroutines computing nworker no. of blocks, in parallel. In addition, I have removed the channel. I have benchmarked the code and it performs identically to the parallel implementation with the channel, hence why I haven't attached the benchmarks.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        for j := 0; j < nworkers && (i+j) < nblocks; j++ {
            wg.Add(1)
            go func(i, j int) {
                defer wg.Done()
                a, b := a, b
                incx, incy, incz := incx, incy, incz
                k := i + j
                beg := k * cachecap
                end := (k + 1) * cachecap
                if end >= zn {
                    end = zn
                }
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }(i, j)
        }
    wg.Wait()
}

Edit.2: I have written another version of the blocked_parallel_saxpy, again, without channels. This time, I spawn NumCPU goroutines, each processing nblocks/nworkers + 1 blocks where each block is cachecap no. of elements in length. Even, here, the code performs identically to the previous two implementations.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(runtime.NumCPU())
    if nblocks < nworkers {
        nworkers = nblocks
    }

    k := nblocks/nworkers + 1
    var wg sync.WaitGroup
    wg.Add(nworkers)
    for i := 0; i < nworkers; i++ {
        go func(i int) {
            defer wg.Done()
            for j := 0; j < k && (j+i*k) < nblocks; j++ {
                beg := (j + i*k) * cachecap
                end := beg + cachecap
                if end > zn {
                    end = zn
                }
                //fmt.Printf("i:%d, j:%d, k:%d, [beg:end]=[%d:%d]\n", i, j, k, beg, end)
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }(i)
    }

    wg.Wait()
}
like image 916
Tvaṣṭā Avatar asked Jan 29 '26 06:01

Tvaṣṭā


1 Answers

I'd try a parallel version without channels, where each worker computes every 8th block, without coordination.

like image 76
gthomas Avatar answered Jan 31 '26 00:01

gthomas



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!