I have a reactive stream that I would like one step of which to apply a validation check, that if failed, will throw an exception. Is there a commonly accepted style to do that with? From what I can tell I have three options (using Mono
) in then()
, filter()
, and map()
.
filter()
is closest to the flow I want, in that I'm not actually trying to change the type of data in the stream or switch to another stream. But, filter is supposed to return true/false to filter out items, so it's a little goofy to always return TRUE.then()
lets me specifically choose error/success emissions, but sometimes for this type of validation I am not able to easily split it off into it's own private method and the boilerplate makes the stream declaration messier to read.map()
is pretty much the same as using filter()
, except you always return in the input instead of TRUE.As a very contrived example, consider a service that has a list of 0 or more letters to send to a person:
public interface Person {
UUID getId();
List<String> getKnownLanguages();
}
public interface Letter {
String getLanguage();
}
public class LetterService {
private Letter findOneLetterForPerson(final UUID id) { /* ... */ }
private void removeLetter(final Letter letter) { /* ... */ }
}
What is the better option for creating a method that looks like this:
public Mono<Optional<Letter>> getNextValidLetterForPerson(final Person person) {
return Mono.just(person)
.and(this::getNextLetterForPerson)
/////////////////////////////////////////
//
.filter(this::validatePersonCanReadLetter1)
.map(Tuple2::getT2)
//
// OR
//
.then(this::validatePersonCanReadLetter2)
//
// OR
//
.map(this::validatePersonCanReadLetter3)
//
/////////////////////////////////////////
// If the letter was invalid for the person, remove the letter from the
// the system as a side effect, and retry retrieving a letter to send
.doOnError(this::removeInvalidLetter)
.retry(this::ifLetterValidationFailed)
// Map the result to an appropriate Optional
.map(Optional::of)
.defaultIfEmpty(Optional.empty());
}
The supporting methods used in the example above are:
public static class LetterInvalidException extends RuntimeException {
private Letter mLetter;
public LetterInvalidException(final Letter letter) { mLetter = letter; }
public Letter getLetter() { return mLetter; }
}
/** Gets the next letter for a person, as a reactive stream */
private Mono<Letter> getNextLetterForPerson(final Person person) {
return Mono.create(emitter -> {
final Letter letter = mLetterService.findOneLetterForPerson(person.getId());
if (letter != null) {
emitter.success(letter);
}
else {
emitter.success();
}
});
}
/** Used to check whether the cause of an error was due to an invalid letter */
private boolean ifLetterValidationFailed(final Throwable e) {
return e instanceof LetterInvalidException;
}
/** Used to remove an invalid letter from the system */
private void removeInvalidLetter(final Throwable e) {
if (ifLetterValidationFailed(e)) {
mLetterService.removeLetter(((LetterInvalidException)e).getLetter());
}
}
/*************************************************************************
*
*************************************************************************/
private boolean validatePersonCanReadLetter1(final Tuple2<Person, Letter> tuple) {
final Person person = tuple.getT1();
final Letter letter = tuple.getT2();
if (!person.getKnownLanguages().contains(letter.getLanguage())) {
throw new LetterInvalidException(letter);
}
return true;
}
private Mono<Letter> validatePersonCanReadLetter2(final Tuple2<Person, Letter> tuple) {
return Mono.create(emitter -> {
final Person person = tuple.getT1();
final Letter letter = tuple.getT2();
if (!person.getKnownLanguages().contains(letter.getLanguage())) {
emitter.error(new LetterInvalidException(letter));
}
else {
emitter.success(letter);
}
});
}
private Letter validatePersonCanReadLetter3(final Tuple2<Person, Letter> tuple) {
final Person person = tuple.getT1();
final Letter letter = tuple.getT2();
if (!person.getKnownLanguages().contains(letter.getLanguage())) {
throw new LetterInvalidException(letter);
}
return letter;
}
Ideally I would loved a method such as Mono<T> validate(..)
that would allow testing the stream item and either returning or throwing an exception (if returned, the framework would treat that as an error), but I'm rather new to reactive programming and didn't see anything that worked like that.
Maybe handle is a better solution it can serve as a combination of map and filter:
Mono.just(p).and(test::getNextLetterForPerson).handle((tuple, sink) -> {
final Person person = tuple.getT1();
final Letter letter = tuple.getT2();
if (!person.getKnownLanguages().contains(letter.getLanguage())) {
sink.error(new LetterInvalidException(letter));
return;
}
sink.next(letter);
}).subscribe(value -> System.out.println(((Letter) value).getLanguage()),
t -> System.out.println(t.getMessage()));
As you can see it's almost like your validatePersonCanReadLetter3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With