I was going through the Community#DOC-1006738 from Oracle related to the concurrency concepts of Flow.Publisher
and Flow.Subscriber
. There on one can find the Sample code to transform data stream using processor which has these two lines of code, which has left me a little puzzled.
//Create Processor and Subscriber
MyFilterProcessor<String, String> filterProcessor =
new MyFilterProcessor<>(s -> s.equals("x"));
Question 1. How could the MyFilterProcessor be of type <String, String>
here?
To what I at first thought was, these might have been <String, Boolean>
instead, but then that would defy the further definition of the subscriber definition in the next line :-
MyTransformProcessor<String, Integer> transformProcessor =
new MyTransformProcessor<>(s -> Integer.parseInt(s));
Additional note here, unless I explicitly cast(correct) the above as
MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))
I get an error in parseInt
reading, cannot be applied to Object
.
-- Why do I need to explicitly cast the RHS here? --
Though the code is mostly present in the shared link, yet the useful constructor definitions I am using are
public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyTransformProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
and an identical one for filterProcessor
as :-
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function function;
MyFilterProcessor(Function<? super T, ? extends R> function) {
super();
this.function = function;
}
...
}
Question. Now with those changes(one after resolving the question 1 and another from the additional note), how can one implement the sample correctly? Or am I simply missing out on something very basic?
I believe your major mistake was implementing MyFilterProcessor
as an (almost) exact copy of MyTransformProcessor
.
Since the author did not post the code of the said class, I tried to guess its behavior based on:
... = new MyFilterProcessor<>(s -> s.equals("x"));
The name Filter
suggests the component is meant to accept and then re-publish only certain values. At this point a function that evaluates to a boolean
(or a Predicate<T>
) is quite acceptable within the context ( and thus s -> s.equals("x")
).
The initial data stream at the end of the page
String[] items = {"1", "x", "2", "x", "3", "x"};
seems to confirm my assumptions. The author simply wants to filter out the "x"
values, and this task is given to MyFilterProcessor
which must evaluate each type before posting it to the rest of the pipeline; and the output type must be the same as the input type.
The constructor should then look like this:
MyFilterProcessor(Predicate<? super R> predicate) { /* ... */ }
// or
MyFilterProcessor(Function<? super R, Boolean> function) { /* ... */ }
and onNext
supposedly forwards only certain elements:
if (! predicate.test(item)) {
int max = submit(item); // get the estimated maximum lag
subscription.request(max);
}
I have a two ideas for the definition of MyFilterProcessor
:
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<R, R>
as Flow.Processor
is meant to accept and forward the same type.
I just can't seem to fit the type T
anywhere. And this is where I am blocked.
public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>
but then, in onNext
, you must cast <T>
to <R>
(ugly, very ugly)
if (! predicate.test(item)) {
int max = submit( (R) item);
subscription.request(max);
}
You'd be testing a Predicate<? super T>
in this case.
If you are willing to refactor a little bit, as SubmissionPublisher
already inherits the behavior of Flow.Publisher
you could have the class just implement Flow.Subscriber
:
public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>
and so
MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
finally works.
If you print the values within MyFilterProcessor
and MySubscriber
you will get this output:
Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3
which is the expected result.
When you test, remember to wait for the pipeline to finish before quitting the application as SubmissionPublisher
issues the elements in another Thread
.
Also, please, contrary to article, have the common sense to change
private Function function;
// ...
submit((R) function.apply(item));
to
private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));
Why do I need to explicitly cast the RHS here?
I'm still struggling to understand how you got the cannot be applied to Object
error. Which jdk number and IDE are you using?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With