Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a thread-safe queue balancer

My project involves processing images for clients en mass. Clients send image files zipped up, which fires off ImageMagick command-line scripts per image. The problem I am trying to solve is that if these commands are queued in the order I receive them, then a client that needs to process 10k images will hog all resources for hours. My solution is to round-robin each client's queues, so that everyone slows each other down equally. I have created this class to implement this:

class QueueBalancer():
    def __init__(self, cycle_list=[]):
        self.cycle_list = cycle_list
        self.update_status()

    def cmd_gen(self):
        index = -1
        while True:
            try:
                if self.cycle_list:
                    self.processing = True
                    index += 1
                    commands = self.cycle_list[index]["commands"]
                    if commands:
                        command = commands.pop(0)
                        if len(commands) == 0:
                            del self.cycle_list[index]
                            index -= 1
                        self.update_status()
                        yield command
                else:
                    yield None
            except IndexError:
                index = -1

    def get_user(self, user):
        return next((item for item in self.cycle_list[0] if item["user"] == user), None)

    def create_or_append(self, user, commands):
        existing_user = self.get_user(user)
        if existing_user:
            index = self.cycle_list.index(existing_user)
            self.cycle_list[index]["commands"] += commands
        else:
            self.cycle_list += [{
                                      "user"     : user,
                                      "commands" : commands
                                   }]

    def update_status(self):
        if next((item for item in self.cycle_list if item["commands"] != []), None):
            self.processing = True
        else:
            self.processing = False

    def status(self):
        return self.processing

As you can see from the else clause of create_or_append(), the cycle_list is a list of dictionaries like this:

{"user": "test1", "commands": ["command1", "command2"]},
{"user": "test2", "commands": ["x", "y", "z"]},
{"user": "test3", "commands": ["a", "b", "c"]}

(real commands removed, sample strings used)

A single instance of cmd_gen() will be used to feed commands into my shell, and I will use create_or_append() to add in users and commands on-the-fly, while commands in the queue are still being processed. This seems to work great so far in my initial tests, but is this theoretically thread safe? If not, what would I need to do to make sure it is?

like image 376
Julien Avatar asked Dec 11 '25 10:12

Julien


1 Answers

I thought I'd have a shot at creating a generic balanced queue like you described - here's the result. I think there are still some pathological cases where a user could have many jobs processed sequentially, but it would involve other users jobs being added a specific times/orders, so I don't think it would happen in the real works and couldn't be exploited unless multiple users colluded.

from threading import Lock


class UserBalancedJobQueue(object):

    def __init__(self):
        self._user_jobs = {}
        self._user_list = []
        self._user_index = 0
        self._lock = Lock()

    def pop_user_job(self):
        with self._lock:
            if not self._user_jobs:
                raise ValueError("No jobs to run")

            if self._user_index >= len(self._user_list):
                self._user_index = 0
            user = self._user_list[self._user_index]

            jobs = self._user_jobs[user]
            job = jobs.pop(0)

            if not jobs:
                self._delete_current_user()

            self._user_index += 1
            return user, job

    def _delete_current_user(self):
        user = self._user_list.pop(self._user_index)
        del self._user_jobs[user]

    def add_user_job(self, user, job):
        with self._lock:
            if user not in self._user_jobs:
                self._user_list.append(user)
                self._user_jobs[user] = []
            self._user_jobs[user].append(job)


if __name__ == "__main__":
    q = UserBalancedJobQueue()
    q.add_user_job("tom", "job1")
    q.add_user_job("tom", "job2")
    q.add_user_job("tom", "job3")
    q.add_user_job("fred", "job4")
    q.add_user_job("fred", "job5")

    for i in xrange(3):
        print q.pop_user_job()

    print "Adding more jobs"
    q.add_user_job("dave", "job6")
    q.add_user_job("dave", "job7")
    q.add_user_job("dave", "job8")
    q.add_user_job("dave", "job9")

    try:
        while True:
            print q.pop_user_job()
    except ValueError:
        pass

Thinking more about it, an alternative implementation would be to remember for each user when their last job was run, and then choose the next user based who's last job was the oldest. It would probably be more 'correct' but it would have the (probably negligible) extra memory overhead of remembering last job time for every user.

Edit: So it's a slow day - here's that other approach. I think I prefer it to the above, though it's slower due to O(N) search for the user with the oldest previous job.

from collections import defaultdict
from threading import Lock
import time


class UserBalancedJobQueue(object):

    def __init__(self):
        self._user_jobs = defaultdict(list)
        self._user_last_run = defaultdict(lambda: 0.0)
        self._lock = Lock()

    def pop_user_job(self):

        with self._lock:
            if not self._user_jobs:
                raise ValueError("No jobs to run")

            user = min(
                self._user_jobs.keys(),
                key=lambda u: self._user_last_run[u]
            )
            self._user_last_run[user] = time.time()

            jobs = self._user_jobs[user]
            job = jobs.pop(0)

            if not jobs:
                del self._user_jobs[user]

            return user, job

    def add_user_job(self, user, job):
        with self._lock:
            self._user_jobs[user].append(job)
like image 118
Tom Dalton Avatar answered Dec 14 '25 01:12

Tom Dalton