Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to architect a multi-step process using a message queue?

Say I have a multi-step, asynchronous process with these restrictions:

  1. Individual steps can be performed by any worker
  2. Steps must be performed in-order

The approach I'm considering:

  1. Insert a db row that represents the entire process, with a "Steps completed" column to keep track of the progress.
  2. Subscribe to a queue that will receive a message when the entire process is done.
  3. Upon completion of each step, update the db row and queue the next step in the process.
  4. After the last step is completed, queue the "process is complete" message.
  5. Delete the db row.

Thoughts? Pitfalls? Smarter ways to do it?

like image 802
marclar Avatar asked Feb 14 '13 04:02

marclar


People also ask

Why use message queue in Architecture?

In modern cloud architecture, applications are decoupled into smaller, independent building blocks that are easier to develop, deploy and maintain. Message queues provide communication and coordination for these distributed applications.

How to implement message queue?

The basic architecture of a message queue is simple: there are client applications called producers that create messages and deliver them to the message queue. Another application, called a consumer, connects to the queue and gets the messages to be processed.

What is a message queue and where is it used?

A message queue is a software engineering component used for communication between processes or between threads within the same process.

What is message queue in embedded system?

What are message queues? As shown in Figure 1, a message queue is a kernel object (i.e., a data structure) through which messages are sent (i.e., posted) from either interrupt service routines (ISRs) or tasks to another task (i.e., pending).


1 Answers

I've built a system very similar to what you've described in a large, task-intensive document processing system, and have had to live with both the pros and the cons for the last 7 years now. Your approach is solid and workable, but I see some drawbacks:

  • Potentially vulnerable to state change (i.e., what if process inputs change before all steps are queued, then the later steps could have inputs inconsistent with earlier steps)

  • More infrastructural than you'd like, involving both a DB and a queue = more points of failure, harder to set up, more documentation required = doesn't quite feel right

  • How do you keep multiple workers from acting on the same step concurrently? In other words, the DB row says 4 steps are completed, how does a worker process know if it can take #5 or not? Doesn't it need to know whether another process is already working on this? One way or another (DB or MQ) you need to include additional state for locking.

  • Your example is robust to failure, but doesn't address concurrency. When you add state to address concurrency, then failure handling becomes a serious problem. For example, a process takes step 5, and then puts the DB row into "Working" state. Then when that process fails, step 5 is stuck in "Working" state.

  • Your orchestrator is a bit heavy, as it is doing a lot of synchronous DB operations, and I would worry that it might not scale as well as the rest of the architecture, as there can be only one of those...this would depend on how long-running your steps were compared to a database transaction--this would probably only become an issue at very massive scale.

If I had it to do over again, I would definitely push even more of the orchestration onto the worker processes. So, the orchestration code is common and could be called by any worker process, but I would keep the central, controlling process as light as possible. I would also use only message queues and not any database to keep the architecture simple and less synchronous.

I would create an exchange with 2 queues: IN and WIP (work in progress)

The central process is responsible for subscribing to process requests, and checking the WIP queue for timed out steps.

1) When the central process received a request for a given processing (X), it invokes the orchestration code, and it loads the first task (X1) into the IN queue

2) The first available worker process (P1) transactionally dequeues X1, and enqueues it into the WIP queue, with a conservative time-to-live (TTL) timeout value. This dequeueing is atomic, and there are no other X tasks in IN, so no second process can work on an X task.

3) If P1 terminates suddenly, no architecture on earth can save this process except for a timeout. At the end of the timeout period, the central process will find the timed out X1 in WIP, and will transactionally dequeue X1 from WIP and enqueue it back into IN, providing the appropriate notifications.

4) If P1 terminates abnormally but gracefully, then the worker process will transactionally dequeue X1 from WIP and enqueue it back into IN, providing the appropriate notifications. Depending on the exception, the worker process could also choose to reset the TTL and retry the step.

5) If P1 hangs indefinitely, or exceeds its TTL, same result as #3. The central process handles it, and presumably the worker process will at some point be recycled--or the rule could be to recycle the worker process anytime there's a timeout.

6) If P1 succeeds, then the worker process will determine the next step, either X2 or X-done. If the next step is X2, then the worker process will transactionally dequeue X1 from WIP, and enqueue X2 into IN. If the next step is X-done, then the processing is complete, and the appopriate action can be taken, perhaps this would be enqueueing X-done into IN for subsequent processing by the orchestrator.

The benefits of my suggested approach are:

  • Contention between worker processes is specified

  • All possible failure scenarios (crash, exception, hang, and success) are handled

  • Simple architecture can be completely implemented with RabbitMQ and no database, which makes it more scalable

  • Since workers handle determining and enqueueing the next step, there is a more lightweight orchestrator, leading to a more scalable system

The only real drawback is that it is potentially vulnerable to state change, but often this is not a cause for concern. Only you can know whether this would be an issue in your system.

My final thought on this is: you should have a good reason for this orchestration. After all, if process P1 finishes task X1 and now it is time for some process to work on next task X2, it seems P1 would be a very good candidate, as it just finished X1 and is now available. By that logic, a process should just gun through all the steps until completion--why mix and match processes if the tasks need to be done serially? The only async boundary really would be between the client and the worker process. But I will assume that you have a good reason to do this, for example, the processes can run on different and/or resource-specialized machines.

like image 198
J.T. Taylor Avatar answered Nov 15 '22 09:11

J.T. Taylor