Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java application: Sequence workflow pattern

I have a spring web application. When a user calls save endpoint, system should execute many external calls to save the state in multiple microservices. However, those steps depend on each other. In other words I have a sequence of steps to perform. sequence pattern

Just calling a set of steps one by one is not a big deal, I can just create class for each step and call them one by one doing appropriate modifications between the steps.

However, each of the steps can fail and if it happens it should be properly reported to the user. Here is a pseudo-code for a straight forward solution:

var response = new Response()
try {
    var result1 = step1.execute(args1)
    var args2 = process(result1, args1)
    var result2 = step2.execute(args2)
    ...
catch(Step1Exception e) {
    response.setIsPartialSuccess(true);
    response.setPartialResults(e.getDetails())
}
catch(Step2Exception e) {
    response.setIsPartialSuccess(true);
    response.setPartialResults(e.getDetails())
}
return response; 

Each step can process list of items. Some steps will send all items at once (either they all fail, or none), some steps will send them one by one (half can fail, half can pass). StepException will contain that information, i.e. what passed, what failed.

As you can see, it's not really maintainable. Using Spring Batch would be overkill here, because I am not reading and writing something, I don't need any multithreading, job details, or checkpoints. However, the idea is quite similar, I want to create some building blocks and control the flow.

At the moment I am trying to figure out if Spring Reactor can help here (yes I know it's for different purposes) since it has stream/pipe with some error handling. Imagine I could write something like:

var context = new Context(response, args1);
Mono.just(context)
    .map(step1::execute)
    .onErrorReturn(e -> context.withError(e))
    //I assume if error happened before
    //steps below are not executed
    .map(step2::execute) 
    .onErrorReturn(e -> context.withError(e))
    .block()
 return context;

You can think of data processed by a reactive application as moving through an assembly line. Reactor is both the conveyor belt and the workstations. The raw material pours from a source (the original Publisher) and ends up as a finished product ready to be pushed to the consumer (or Subscriber).

The raw material can go through various transformations and other intermediary steps or be part of a larger assembly line that aggregates intermediate pieces together. If there is a glitch or clogging at one point (perhaps boxing the products takes a disproportionately long time), the afflicted workstation can signal upstream to limit the flow of raw material.

In other words I am searching for a framework which is similar to the above. I don't need any async processing or retries now, but they may be useful in the future. Please, let me know if there is something better than reactor for my needs.

like image 354
Vadim Kirilchuk Avatar asked Oct 29 '22 02:10

Vadim Kirilchuk


1 Answers

Even though you don't need non-blocking asynchronous calls right now, Reactor can still be a good fit for this because it is good at orchestrating that sort of processing pipeline. I'd argue Java 8 Stream could also fit the bill but is a little less powerful in that regard.

Expanding method references for clarity, and with a bit of guesswork on my part, your code would look like something like this in Reactor:

var response = Mono.just(initialArgs)
    .flatMap(args1 -> Mono.fromCallable(() -> step1.execute(args1))
        .map(result1 -> process(result1, args1) //args1 still in scope inside flatMap
    )
    .flatMap(args2 -> Mono.fromCallable(() -> step2.execute(args2))
    //alternatively to last flatMap, with caveat:
    //.map(args2 -> step2.execute(args2))
    .map(endResult -> new Response(endResult))
    .onErrorResume(error -> {
        Response errorResponse = new Response();
        errorResponse.setIsPartialSuccess(true);
        errorResponse.setPartialResults(error.getDetails());
        return Mono.just(errorResponse);
    })
    .block();

Operators used in this particular chain don't change threads, so this would all execute in the thread from which the last block() method is called.

Errors from any step stop the whole processing and are propagated to the end (block() would then throw the exception).

Note that some operators (mostly those that have a notion of time) change threads, and at this point stepX.execute being blocking becomes a problem because that would block threads that are supposed to be shared by the whole of Reactor code (not only a particular processing pipeline) and are limited resources.

like image 168
Simon Baslé Avatar answered Nov 15 '22 05:11

Simon Baslé