Say that you have a set of Flux
that you would like to zip together with a bifunction.
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.zip(flux1, flux2, this::zipString).subscribe(System.out::println);
This bifunction below will return
null
if an object meets a certain constraint. So that we can possibly apply a filter after zipping theFlux
together.
public String zipString(String a, String b) {
if (a.equals("B"))
return null;
return a + b;
}
This strategy will end up throwing a NullPointerException
.
Exception in thread "main"
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The zipper returned a null value
Caused by: java.lang.NullPointerException: The zipper returned a null value
at java.util.Objects.requireNonNull(Objects.java:228)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:711)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:861)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
EDIT: On a side note this will also occur will occur when you have a Flux
containing a null.
Flux<String> flux2 = Flux.just(null, "B", "C");
So this leads me to ask. Why are null
values prohibited from entering a Flux?
What are some potential strategies to mitigate the use case below:
flux2 contains a value that we should discard, therefore the set should be discarded.
The reactive spec doesn't allow null
in streams. Pick a constant like "n/a" and filter it out later.
https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code
- Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller
Felt like this should be revisited after some time dealing with reactive streams. There are two approaches that handle these cases more elegantly than a default value to be filtered downstream.
Tuples (Preferred and optimal)
Flux.zip(flux1,flux2)
.filter(t -> !t.getT1().equals("B") || !t.getT2().equals("B"))
.map(this::zipString)
.subscribe(System.out::println)
Zipping Strategy returning Optional
public Optional<String> zipString(String a, String b) {
if (a.equals("B"))
return Optional.empty();
return Optional.of(a + b);
}
Flux.zip(flux1, flux2, this::zipString)
.filter(Optional::isPresent)
.map(Optional::get)
.subscribe(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With