Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to perform concurrent downloads in Go

We have a process whereby users request files that we need to get from our source. This source isn't the most reliable so we implemented a queue using Amazon SQS. We put the download URL into the queue and then we poll it with a small app that we wrote in Go. This app simply retrieves the messages, downloads the file and then pushes it to S3 where we store it. Once all of this is complete it calls back a service which will email the user to let them know that the file is ready.

Originally I wrote this to create n channels and then attached 1 go-routine to each and had the go-routine in an infinite loop. This way I could ensure that I was only ever processing a fixed number of downloads at a time.

I realised that this isn't the way that channels are supposed to be used and, if I'm understanding correctly now, there should actually be one channel with n go-routines receiving on that channel. Each go-routine is in an infinite loop, waiting on a message and when it receives it will process the data, do everything that it's supposed to and when it's done it will wait on the next message. This allows me to ensure that I'm only ever processing n files at a time. I think this is the right way to do it. I believe this is fan-out, right?

What I don't need to do, is to merge these processes back together. Once the download is done it is calling back a remote service so that handles the remainder of the process. There is nothing else that the app needs to do.

OK, so some code:

func main() {
    queue, err := ConnectToQueue() // This works fine...
    if err != nil {
        log.Fatalf("Could not connect to queue: %s\n", err)
    }

    msgChannel := make(chan sqs.Message, 10)

    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
        go processMessage(msgChannel, queue)
    }

    for {
        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)

        for _, m := range response.Messages {
            msgChannel <- m
        }
    }
}

func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
    for {
        m := <-ch
        // Do something with message m

        // Delete message from queue when we're done
        queue.DeleteMessage(&m)
    }
}

Am I anywhere close here? I have n running go-routines (where MAX_CONCURRENT_ROUTINES = n) and in the loop we will keep passing messages in to the single channel. Is this the right way to do it? Do I need to close anything or can I just leave this running indefinitely?

One thing that I'm noticing is that SQS is returning messages but once I've had 10 messages passed into processMessage() (10 being the size of the channel buffer) that no further messages are actually processed.

Thanks all

like image 586
Engineer81 Avatar asked Oct 20 '22 03:10

Engineer81


1 Answers

That looks fine. A few notes:

  1. You can limit the work parallelism by means other than limiting the number of worker routines you spawn. For example you can create a goroutine for every message received, and then have the spawned goroutine wait for a semaphore that limits the parallelism. Of course there are tradeoffs, but you aren't limited to just the way you've described.

    sem := make(chan struct{}, n)
    work := func(m sqs.Message) {
        sem <- struct{}{} // When there's room we can proceed
        // do the work
        <-sem // Free room in the channel
    }()
    for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
        for _, m0 := range m {
            go work(m0)
        }
    }
    
  2. The limit of only 10 messages being processed is being caused elsewhere in your stack. Possibly you're seeing a race where the first 10 fill the channel, and then the work isn't completing, or perhaps you're accidentally returning from the worker routines. If your workers are persistent per the model you've described, you'll want to be certain that they don't return.

  3. It's not clear if you want the process to return after you've processed some number of messages. If you do want this process to exit, you'll need to wait for all the workers to finish their current tasks, and probably signal them to return afterwards. Take a look at sync.WaitGroup for synchronizing their completion, and having another channel to signal that there's no more work, or close msgChannel, and handle that in your workers. (Take a look at the 2-tuple return channel receive expression.)

like image 53
Matt Joiner Avatar answered Oct 22 '22 02:10

Matt Joiner