Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pipeline design pattern implementation

This is a design question regarding the implementation of a Pipeline. The following is my naive implementation.

Interface for individual steps/stages in the pipeline:

public interface Step<T, U> {
    public U execute(T input);
}

Concrete implementations of steps/stages in pipeline:

public class StepOne implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 100;
    }
}

public class StepTwo implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 500;
    }
}

public class StepThree implements Step<Integer, String> {
    @Override
    public String execute(Integer input) {
        return "The final amount is " + input;
    }
}

The pipeline class will hold/register the steps in the pipeline and execute them one after the other:

public class Pipeline {
    private List<Step> pipelineSteps = new ArrayList<>();
    private Object firstStepInput = 100;

    public void addStep(Step step) {
        pipelineSteps.add(step);
    }

    public void execute() {
        for (Step step : pipelineSteps) {
            Object out = step.execute(firstStepInput);
            firstStepInput = out;
        }
   }
}

Diver program to execute the pipeline:

public class Main {
    public static void main(String[] args) {
        Pipeline pipeline = new Pipeline();
        pipeline.addStep(new StepOne());
        pipeline.addStep(new StepTwo());
        pipeline.addStep(new StepThree());

        pipeline.execute();
    } 
}

However, as you can see the naive implementation has many limitations.

One of the major ones is that since the requirement is that the output of each step could be of any type, the naive implementation is not type-safe (the execute method in the Pipeline class). If I happen to wire the steps in the pipeline incorrectly, the app will fail.

Can anyone help me design the solution by adding to what I have coded, or point me towards an already existing pattern to solve this?

like image 815
Prashant Chauhan Avatar asked Oct 09 '16 18:10

Prashant Chauhan


People also ask

How do you implement pipeline?

To implement your pipeline you will need to modify your state controller and add some pipeline registers between pipeline stages. This is one of the possible ways: A new instruction word is loaded to the instruction register. It is decoded by the control unit.

What is implementation in design pattern?

Implementation of a design pattern can take many forms according to the programming language being used. Most of the literature presents design patterns in their conventional object-oriented implementations. Several other studies show the implementation in aspect-oriented languages such as AspectJ, EOS, and Caesar.

How does a pipeline algorithm work?

In a pipeline algorithm, concurrency is limited until all the stages are occupied with useful work. This is referred to as "filling the pipeline". At the tail end of the computation, again there is limited concurrency as the final item works its way through the pipeline. This is called "draining the pipeline".

What is pipelined method?

Pipelining is a technique where multiple instructions are overlapped during execution. Pipeline is divided into stages and these stages are connected with one another to form a pipe like structure. Instructions enter from one end and exit from another end. Pipelining increases the overall instruction throughput.


3 Answers

why do you need an additional Pipeline class ? I think you can remove the middle man. this will make your api simpler, for example:

Step<Integer, String> source = Step.of(Object::toString);
Step<Integer, Integer> toHex = source.pipe(it -> Integer.parseInt(it, 16));

toHex.execute(11/*0x11*/);// return 17;

you can implement your pipeline pattern simply in java-8 as below :

interface Step<I, O> {

    O execute(I value);

    default <R> Step<I, R> pipe(Step<O, R> source) {
        return value -> source.execute(execute(value));
    }

    static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}

in prior java version you can use an abstract class instead:

abstract static class Step<I, O> {

    public abstract O execute(I value);

    public <R> Step<I, R> pipe(Step<O, R> source) {
        return new Step<I, R>() {
            @Override
            public R execute(I value) {
                return source.execute(Step.this.execute(value));
            }
        };
    }

    public static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}
like image 109
holi-java Avatar answered Oct 18 '22 19:10

holi-java


I would focus on

If I happen to wire the steps in the pipeline incorrectly, the app will fail.

Yes, this is a problem. StepThree is the stranger here. I do not think one simple pattern might help, I do think it must be a combination of strategy and builder pattern. For example:

Pipeline<Integer,Integer> intPipe = new Pipeline<>();
intPipe = intPipe.add(new StepOne()); // increment 100
intPipe = intPipe.add(new StepTwo()); // increment 500
Pipeline<String, Integer> strPipe = intPipe.add(new StepThree()); // convert

Whereat Pipeline is like this:

public static class Pipeline<IN, OUT> {
   //...
   public<A> Pipeline<OUT,A> add(Step<IN,A> step) {
     pipelineSteps.add(step);
     return (Pipeline<OUT,A>)this;
   }
}

Using the fast-builder-syntax this might work:

Pipeline<String, Integer> pipe = new Pipeline<Integer, Integer>()
    .add(new StepOne()).add(new StepTwo()).add(new StepThree());

This should work since generics are not part of the bytecode.

like image 30
Grim Avatar answered Oct 18 '22 19:10

Grim


You don't need to create a new Interface for this.

Java 8 already has a Functional Interface called Function and it allows you to create a Chaining of Functions (in other words, your Pipeline).

Function<Integer, Integer> addOne = it -> {
            System.out.println(it + 1);
            return it + 1;
        };

Function<Integer, Integer> addTwo = it -> {
            System.out.println(it + 2);
            return it + 2;
        };

Function<Integer, Integer> timesTwo = input -> {
            System.out.println(input * 2);
            return input * 2;
        };

final Function<Integer, Integer> pipe = addOne
        .andThen(timesTwo)
        .andThen(addTwo);

pipe.apply(10);

If you want to read more about Functional Interfaces: https://medium.com/@julio.falbo/java-recent-history-java-8-part-2-functional-interface-predefined-functional-interface-2494f25610d5

like image 14
Júlio Falbo Avatar answered Oct 18 '22 21:10

Júlio Falbo