Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Job queue with job affinity

I am currently facing a problem for which I am pretty sure there is an official name, but I don't know what to search the web for. I hope that if I describe the problem and the solution I have in mind, somebody is able to tell me the name of the design pattern (if there is one that matches what I am going to describe).

Basically, what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Now I could solve this using RabbitMQ and its consistent hashing exchange, but then I have a data race when new workers enter the cluster, because RabbitMQ does not support re-locating the jobs that are already in a queue.

MQTT 5 does not support this either: Here this idea is known as "sticky shared subscriptions", but this is not official. It may be part of MQTT 6, or it may not. Who knows.

I have also taken a look at NSQ, NATS, and some other brokers. Most of them don't even support this very specific scenario, and those that do use consistent hashing, which has the previously mentioned data racing problem.

Now, the problem would be gone if the broker would not sort the jobs into queues, once the jobs arrive, but if it would track if a job for a specific user is already being processed: If so, it should delay all other jobs for this user, but all jobs for other users should still process. This is, AFAICS, not possible using RabbitMQ et al.

I am pretty sure that I am not the only person with a use case for that. I could e.g. think of users uploading videos to a video platform, and although uploaded videos are processed in parallel, all the videos uploaded by a single user are processed sequentially.

So, to cut a long story short: Is what I describe known under a common name? Something such as distributed job queue? Task dispatcher with task affinity? Or anything else? I have tried lots of terms, but didn't succeed. This may mean that there is no solution for this, but as said, it's hard to imagine that I'm the only person on the planet with this problem.

Any ideas what I could look for? And: Are there any tools that implement this? Any protocols?

PS: Just using a predefined routing key is not an option, since the user ids (which I just used as a made-up example here) are basically UUIDs, so there can be billions of it, so I need something more dynamic. Hence, consistent hashing is basically the correct approach, but as said, the distribution has to work piece by piece, not upfront, to avoid data races.

like image 410
Golo Roden Avatar asked May 22 '19 19:05

Golo Roden


4 Answers

Temporal Workflow is capable of supporting your use case with minimal effort.

Here is a strawman design that satisfies your requirements:

  • Send signalWithStart request to a user workflow using userID as the workflow ID. It either delivers the signal to the workflow or first starts the workflow and delivers the signal to it.
  • All requests to that workflow are buffered by it. Temporal provides a hard guarantee that only one workflow with a given ID can exist in an open state. So all signals (events) are guaranteed to be buffered in the workflow that belongs to the user. Temporal preserves all data in the workflow (including stack traces and local variables) in the presence of any process or infra failures. So no need to persist the taskQueue variable explicitly.
  • An internal workflow event loop dispatches these requests one by one.
  • When the buffer is empty workflow can complete.

Here is the workflow code that implements it in Java (Go and PHP SDKs are also supported, NodeJS is in alpha):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

And then the code that enqueues that task to the workflow through the signal method:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

Temporal offers a lot of other advantages over using queues for task processing.

  • Built it exponential retries with unlimited expiration interval
  • Failure handling. For example, it allows executing a task that notifies another service if both updates couldn't succeed during a configured interval.
  • Support for long running heartbeating operations
  • Ability to implement complex task dependencies. For example to implement chaining of calls or compensation logic in case of unrecoverable failures (SAGA)
  • Gives complete visibility into the current state of the update. For example when using queues all you know if there are some messages in a queue and you need additional DB to track the overall progress. With Temporal every event is recorded.
  • Ability to cancel an update in-flight.
  • Distributed CRON support

See the presentation that goes over the Temporal programming model. It mentions the Cadence project which is the predecessor of Temporal.

like image 195
Maxim Fateev Avatar answered Oct 17 '22 09:10

Maxim Fateev


what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Evenif it was not this particular use case, I did a survey of (dynamic) task scheduling [0] [1] a couple month ago and nothing like that surfaced.

Every scheduling algorithm I read about have some properties that are common to all other tasks like priority, age, enqueue time, task name (and by extension average time to process). If you tasks were all linked to a user you could build a scheduler that takes user_id into account to pick task from the queue.

But I guess, you don't want to build your own scheduler, anyway it would be waste because, from experience with such need, existing message queues allow to implement your requirement.

To summarize your requirements you need:

A scheduler that run only one task per user at the same time.

The solution is to use a distributed lock, something like REDIS distlock and acquire the lock before the task starts and refresh it regularly during the task execution. If a new task for the same user comes in and try to execute it will fail to acquire the lock and will be re-enqueued.

Here is a pseudo-code:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

Don't forget to refresh and release the lock.

A similar approach is taken to enforce robots.txt delay between every requests in crawlers.

like image 30
amirouche Avatar answered Oct 17 '22 08:10

amirouche


What amirouche is describing would be a simple solution as long as the lock collision doesn't occur very often. If it does you'll be wasting a lot of time on your workers grabbing messages that they have to reject and have the message broker re-queue.

An alternative that solves this sort of problem very well is the Actor model / Actor frameworks. Some examples include Akka, Orleans, Protoactor, and Cadence (mentioned above, although Candence is much more than just an actor framework). These frameworks can get very complex but at their core can ensure messages for a single actor are processed one at a time but allow many actors to be processing at once (there would be an actor per user ID in your scenario). The frameworks abstract all of the message routing and concurrency away from you greatly simplifying the implementation and should be more robust / scalable in the long term.

like image 24
playsted Avatar answered Oct 17 '22 08:10

playsted


Having a hard requirement of processing order per entity is challenging.

How long-running is each published task? If they are always very short, you could distribute tasks by hash and simply drain the worker pool of running jobs every time it changes shape without losing much productivity.

If they are longer-running, maybe that would be too slow. In that case you could also potentially have the workers take out atomic advisory locks from a fast central service (like Redis or something) for the user_id of each task they consume, for the duration of its execution. This service could also be separately scalable partitioned by user id ranges or what-have-you. If there is enough of a gap between receiving the task and the first side effects from its execution, the worker wouldn't even need to block on the success of taking the lock until it was about to commit, and thereby might not see significantly increased latency. Contention* could be rare: if you are already using some consistent hashing scheme on user_id to distribute work, they would be rare indeed, and still only occur when worker-pool topology changes. You should at least be using hashing distribution to guarantee that there are only two workers competing for the lock: the old one, and the new one.**

If granting the lock was serviced in first-come-first-serve order and locks are requested faster than worker-pool topology changes (that is, workers queue up for the locks as soon as they receive the job from the publisher), this could even give you pretty good guarantees about ordering even when topology changes quite rapidly.

Edits:

*I originally wrote "Failures"; not quite what I meant. The idea is that this lock service would pretty much never experience any locking contention unless the topology changed, since tasks for a given user would always be sent to the same worker normally.

**Another possibility: You could also give good guarantees with only a partial worker pool drain. Without user-level advisory locks, if you are using a consistent hashing scheme to distribute tasks and you can maintain a low water mark for the completion of dispatched tasks, you can defer starting tasks whose target worker is different than it would have been when the oldest currently executing task started (i.e., drain running tasks only for users whose assigned worker changed). It's a fair amount of extra complexity; if you can efficiently track the low water mark and you don't have a long tail of long-running tasks it might be a good option that allows you elide the lock service. However, at the time of writing it's not clear to me whether this would ever be cheaper than locks; low water marks aren't usually cheap to implement reliably, and the death of a worker at the wrong time could delay processing for the entire 1/N cohort that changed workers instead of just the users whose tasks were in-flight on the worker at the time it died.

like image 41
Mumbleskates Avatar answered Oct 17 '22 07:10

Mumbleskates