Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cannot zip a flux and return null values. How can we discard an event pair passed into a flux?

Say that you have a set of Flux that you would like to zip together with a bifunction.

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("D", "E", "F");
Flux.zip(flux1, flux2, this::zipString).subscribe(System.out::println);

This bifunction below will return null if an object meets a certain constraint. So that we can possibly apply a filter after zipping the Flux together.

public String zipString(String a, String b) {
    if (a.equals("B"))
        return null;
    return a + b;
}

This strategy will end up throwing a NullPointerException.

Exception in thread "main" 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: The zipper returned a null value
Caused by: java.lang.NullPointerException: The zipper returned a null value
    at java.util.Objects.requireNonNull(Objects.java:228)
    at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:711)
    at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:861)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
    at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)

EDIT: On a side note this will also occur will occur when you have a Flux containing a null.

Flux<String> flux2 = Flux.just(null, "B", "C");

So this leads me to ask. Why are null values prohibited from entering a Flux?

What are some potential strategies to mitigate the use case below:

flux2 contains a value that we should discard, therefore the set should be discarded.

like image 413
shinjw Avatar asked Dec 29 '17 00:12

shinjw


2 Answers

The reactive spec doesn't allow null in streams. Pick a constant like "n/a" and filter it out later.

https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code

  1. Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller
like image 59
Abhijit Sarkar Avatar answered Sep 21 '22 03:09

Abhijit Sarkar


Felt like this should be revisited after some time dealing with reactive streams. There are two approaches that handle these cases more elegantly than a default value to be filtered downstream.

Tuples (Preferred and optimal)

Flux.zip(flux1,flux2)
     .filter(t -> !t.getT1().equals("B") || !t.getT2().equals("B"))
     .map(this::zipString)
     .subscribe(System.out::println)

Zipping Strategy returning Optional

public Optional<String> zipString(String a, String b) {
    if (a.equals("B"))
        return Optional.empty();
    return Optional.of(a + b);
}

Flux.zip(flux1, flux2, this::zipString)
    .filter(Optional::isPresent)
    .map(Optional::get)
    .subscribe(System.out::println);
like image 43
shinjw Avatar answered Sep 19 '22 03:09

shinjw