Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to bridge between a non-reactive Spring EventListener and a reactive Flux

What's the difference between creating a Flux directly by calling Flux.push and use the sink within push's lambada expression vs. using a sink provided by a DirectProcessor?

In a minimal example where a Flux just emits a couple of events, I could do

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });

vs. using a DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();

Just to clarify: I know that I could use Flux.just here, but my use case is actually building a bridge between Spring's @EventListeners and Spring WebFlux, where I want to create a Flux for every incoming SSE request for a specific resource and then publish events to this Flux.

Could anybody tell me, if both approaches would be valid? Sure, there must be some difference. In particular, the Reactor Reference Guide section on DirectProcessor states:

On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.

What does that mean?

[EDIT:] In an earlier version of the question I was using Flux.generate() instead of Flux.push(), which is obviously wrong, because generate can create one event at most.

[EDIT 2:] @123 asked me for a full example of what I'm trying to achieve. Bear with me, it's a fair amount of code for an SO question:

Full example of what I'm actually trying to do

I'd like to build a bridge between a (non-reactive) Spring domain event listener and a reactive Flux, which I can then use in a WebFlux endpoint to publish SSEs. The following code snippets use Lombok annotations for brevity.

Let's assume that I eventually want to publish the state of a user in an onboarding process as SSEs. Here's the enum:

public enum ProcessState {
  CREATED(false),
  VERIFIED(false),
  AUTHORIZATION_PENDING(false),
  AUTHORIZED(false),
  ACTIVE(true);

  @Getter
  private final boolean terminalState;

  ProcessState(boolean terminalState) {
    this.terminalState = terminalState;
  }

}

The non-reactive business logic will publish StateChangedEvents whenever the state of any user is changed:

@Data
@RequiredArgsConstructor
public class StateChangedEvent {
  private final UUID userId;
  private final ProcessState newState;
}

And this is where my original question comes from. How would I build a bridge that translates this domain events into a Flux stream? My requirements:

  • The current state of the process should be pushed as soon as a new client registers
  • The Flux stream should terminate whenever a "terminal" onboarding state has been reached.

This is what I've got so far:

@Component
@RequiredArgsConstructor
class EventBridge {

  @RequiredArgsConstructor(access = PRIVATE)
  private static class Subscriber {
    private final UUID userId;
    private final FluxSink<ProcessState> sink;
    private boolean eventEmitted;
  }

  private final UserRepository repository;
  private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();

  @EventListener
  void stateChanged(StateChangedEvent event) {
    notifySubscribers(event);
  }

  Flux<ProcessState> register(UUID userId) {
    return Flux.push(emitter -> addSubscriber(userId, emitter));
  }

  private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
    var subscriptionId = randomUUID();
    var subscriber = new Subscriber(userId, sink);
    subscribers.put(subscriptionId, subscriber);
    sink
      .onRequest(n -> poll(subscriber))
      .onDispose(() -> removeSubscriber(subscriptionId));
    return subscriber;
  }

  private void poll(Subscriber subscriber) {
    emit(subscriber, loadCurrentState(subscriber), true);
  }

  private ProcessState loadCurrentState(Subscriber subscriber) {
    return repository.findById(subscriber.userId).getProcessState();
  }

  private void removeSubscriber(UUID subscriptionId) {
    subscribers.remove(subscriptionId);
  }

  private void notifySubscribers(StateChangedEvent event) {
    subscribers.values().stream()
      .filter(subscriber -> subscriber.userId.equals(event.getUserId()))
      .forEach(subscriber -> emit(subscriber, event.getNewState(), false));
  }

  private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
    synchronized (subscriber) {
      if (onlyIfFirst && subscriber.eventEmitted) {
        return;
      }
      subscriber.sink.next(processState);
      if (processState.isTerminalState()) {
        subscriber.sink.complete();
      }
      subscriber.eventEmitted = true;
    }
  }

}

And finally the controller, where the bridge is used:

@RestController
@RequiredArgsConstructor
class UserController {

  private final EventBridge eventBridge;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
  }

}

There are a couple of issues in my bridge code I can't wrap my head around:

  • Do I really have to synchronize on my Subscriber instance to avoid writing stale events from polling the initial state? If I don't it does happen that a StateChange event arrives and gets published before the current state is read from the repository, which is then pushed out of order. Surely, there must be a more elegant Flux-ish way to handle this without the synchronized keyword.

  • We already ruled out Flux.generate, it seems to work with Flux.push, Flux.create will generate a whole lot more SSE events? Why? I fear, I don’t understand the differences between the three.

  • Rather then using the static methods on Flux should I use a DirectProcessor or any other processor here? I'm new to the whole reactive stack and the Spring Reactor documentation is rather too vague for me, tbh. Again: What are the differences? What about that comment about back pressure I mentioned above?

like image 587
Stefan Haberl Avatar asked Apr 01 '20 06:04

Stefan Haberl


People also ask

How do you use flux and Mono?

When we're expecting multiple results from our computation, database, or external service call, then we should use Flux. Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.

How do you get an object from mono without blocking?

A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.

What is reactive flux?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

What is mono and flux in spring boot?

Mono — A publisher that can emit 0 or 1 element. Flux — A publisher that can emit 0.. N elements.


2 Answers

So if I understand what you are trying to do correctly I think your solution could be heavily simplified.

@Component
public class EventBridge {

    private final UserRepository repository;
    private final ReplayProcessor<StateChangedEvent> processor;
    private final FluxSink<StateChangedEvent> sink;


    EventBridge(UserRepository repository){
        this.repository= repository;
        //Replays events from last 100S for every new subscriber
        this.processor = ReplayProcessor.createTimeout(Duration.ofSeconds(100L));
        //Sink provides thread safe next,complete and error for subscribers
        this.sink = processor.sink();
    }

    public void changeState(StateChangedEvent event) {
        //Literally just pass event into sink, calls onNext on subscribers
        sink.next(event);
    }

    public Flux<ProcessState> streamProcessStateForUser(UUID uuid){
        return
                //Search repository first, this isn't great since it blocks until 
                //the repo returns, although that seems to be what you want
                //Also I added an Unknown to ProcessState, since it's better than 
                //it being null. 
                //Also you should probably return optional from repo. 
            Flux.concat(
                    Flux.just(
                            userRepo.findById(uuid).map(User::getProcessState).orElse(ProcessState.UNKNOWN)
                    ),
                    processor
                            //Check the uuid matches the event
                            .filter(stateChangedEvent -> stateChangedEvent.getUserId().equals(uuid))
                            //Time out after 100 seconds, not needed but may be useful for you
                            .take(Duration.ofSeconds(100L))
                            //Complete flux when at terminal state
                            .takeUntil(stateChangedEvent -> stateChangedEvent.getNewState().isTerminalState())
                            //Convert to ProcessState from StateChangedEvent
                            .map(StateChangedEvent::getNewState)
            );
    }

}

Should be able to keep everything else the same.

like image 109
123 Avatar answered Nov 29 '22 17:11

123


Thanks @123 for answering my question about how to build a bridge between Spring's @EventListener and a Flux. As mentioned in the question, the complete use case was to push the domain events to a client using WebFlux's SSE support.

After a little bit of thinking, I realized that it doesn't make sense to build this bridge in the first place, because in a multi-instance scenario the HTTP request might hit a different instance than the one where the onboarding process is running, and therefore no events will be pushed at all.

So in the end, I opted to poll the single source of truth - the database - and push SSE events, whenever the state changes. Would be great, if we could use a reactive data store here, but for now I'm "stuck" with Spring Data JPA and a PostgreSQL.

So, if somebody has the same problem, this is how I built it in the end:

@RestController
@RequiredArgsConstructor
class UserController {

  private final UserRepository userRepository;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return Flux.interval(ZERO, ofSeconds(1L))
      .map(n -> userRepository.findById(userId).getProcessState())
      .takeUntil(processState -> processState.isTerminalState())
      .distinctUntilChanged()
      .map(response -> ServerSentEvent.builder((ProcessState) response).build())
      .take(ofMinutes(30L));
  }

}

Just in case somebody is wondering: This is again simplified to illustrate the problem at hand. We have a hexagonal architecture, i.e., don't inject Repositories in our @RestControllers, but call a business facade, aka input port, from our web layer to retrieve users.

like image 35
Stefan Haberl Avatar answered Nov 29 '22 19:11

Stefan Haberl