Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transform data stream using processor implemented via Flow APIs

I was going through the Community#DOC-1006738 from Oracle related to the concurrency concepts of Flow.Publisher and Flow.Subscriber. There on one can find the Sample code to transform data stream using processor which has these two lines of code, which has left me a little puzzled.

//Create Processor and Subscriber  
MyFilterProcessor<String, String> filterProcessor = 
                                      new MyFilterProcessor<>(s -> s.equals("x")); 

Question 1. How could the MyFilterProcessor be of type <String, String> here?

To what I at first thought was, these might have been <String, Boolean> instead, but then that would defy the further definition of the subscriber definition in the next line :-

MyTransformProcessor<String, Integer> transformProcessor = 
                              new MyTransformProcessor<>(s -> Integer.parseInt(s));  

Additional note here, unless I explicitly cast(correct) the above as

MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))

I get an error in parseInt reading, cannot be applied to Object.

-- Why do I need to explicitly cast the RHS here? --


Though the code is mostly present in the shared link, yet the useful constructor definitions I am using are

public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {       
    private Function function;
    MyTransformProcessor(Function<? super T, ? extends R> function) {  
        super();  
        this.function = function;  
    } 
    ...
} 

and an identical one for filterProcessor as :-

public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
    private Function function;
    MyFilterProcessor(Function<? super T, ? extends R> function) {
        super();
        this.function = function;
    }
    ...
}

Question. Now with those changes(one after resolving the question 1 and another from the additional note), how can one implement the sample correctly? Or am I simply missing out on something very basic?

like image 508
Naman Avatar asked Feb 25 '18 18:02

Naman


1 Answers

I believe your major mistake was implementing MyFilterProcessor as an (almost) exact copy of MyTransformProcessor.

Since the author did not post the code of the said class, I tried to guess its behavior based on:

... = new MyFilterProcessor<>(s -> s.equals("x"));

The name Filter suggests the component is meant to accept and then re-publish only certain values. At this point a function that evaluates to a boolean (or a Predicate<T>) is quite acceptable within the context ( and thus s -> s.equals("x") ).

The initial data stream at the end of the page

String[] items = {"1", "x", "2", "x", "3", "x"};  

seems to confirm my assumptions. The author simply wants to filter out the "x" values, and this task is given to MyFilterProcessor which must evaluate each type before posting it to the rest of the pipeline; and the output type must be the same as the input type.


The constructor should then look like this:

MyFilterProcessor(Predicate<? super R> predicate) { /* ... */ }
// or
MyFilterProcessor(Function<? super R, Boolean> function) { /* ... */ }

and onNext supposedly forwards only certain elements:

if (! predicate.test(item)) {
    int max = submit(item); // get the estimated maximum lag
    subscription.request(max);
}

I have a two ideas for the definition of MyFilterProcessor:

  • 1) public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<R, R>

as Flow.Processor is meant to accept and forward the same type.

I just can't seem to fit the type T anywhere. And this is where I am blocked.

  • 2) public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>

but then, in onNext, you must cast <T> to <R> (ugly, very ugly)

if (! predicate.test(item)) {
    int max = submit( (R) item);
    subscription.request(max);
}

You'd be testing a Predicate<? super T> in this case.

If you are willing to refactor a little bit, as SubmissionPublisher already inherits the behavior of Flow.Publisher you could have the class just implement Flow.Subscriber:

public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>

and so

MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));

finally works.


If you print the values within MyFilterProcessor and MySubscriber you will get this output:

Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3

which is the expected result.


When you test, remember to wait for the pipeline to finish before quitting the application as SubmissionPublisher issues the elements in another Thread.

Also, please, contrary to article, have the common sense to change

private Function function; 
// ...
submit((R) function.apply(item));  

to

private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));

Why do I need to explicitly cast the RHS here?

I'm still struggling to understand how you got the cannot be applied to Object error. Which jdk number and IDE are you using?

like image 74
Marko Pacak Avatar answered Sep 28 '22 08:09

Marko Pacak