I know that EventBus is deprecated in Reactor3.x, and the suggested solution is ReplayProcessor. I have read https://github.com/reactor/reactor-core/issues/375. But the code here is too draft. I created a demo project to prove the idea here. Can someone give some comments?
======== Application.java
package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {
private static final int NUMBER_OF_QUOTES = 10;
@Bean
ReplayProcessor createReplayProcessor() {
ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
interest1.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event 1 handler -> event name:" + value.getEventName());
}
});
interest2.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event2 handler -> event name:" + value.getEventName());
}
});
return rp;
}
public boolean filterInterest1(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event1")) {
return true;
}
return false;
}
public boolean filterInterest2(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event2")) {
return true;
}
return false;
}
@Autowired
private Publisher publisher;
@Bean
public CountDownLatch latch() {
return new CountDownLatch(NUMBER_OF_QUOTES);
}
@Override
public void run(String... args) throws Exception {
publisher.publishQuotes(NUMBER_OF_QUOTES);
}
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(Application.class, args);
app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
}
}
==========MyEvent.java=============
package hello;
public class MyEvent {
private String eventName = "";
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public MyEvent(String eventName) {
this.eventName = eventName;
}
public void filterInterest1(MyEvent myEvent) {
}
}
=============Publisher.java ===========
package hello;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.ReplayProcessor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class Publisher {
@Autowired
ReplayProcessor rp;
@Autowired
CountDownLatch latch;
public void publishQuotes(int numberOfQuotes) throws InterruptedException {
long start = System.currentTimeMillis();
rp.onNext(new MyEvent("event1"));
rp.onNext(new MyEvent("event2"));
rp.onNext(new MyEvent("event3"));
long elapsed = System.currentTimeMillis() - start;
System.out.println("Elapsed time: " + elapsed + "ms");
System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
}
}
The whole code is https://github.com/yigubigu/reactor-start-sample.git
This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they're more capable and offer better control of threading.
Project Reactor is a fully non-blocking foundation with back-pressure support included. It's the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.
Bus reactors A bus reactor is an air core inductor, or oil filled inductor, connected between two buses or two sections of the same bus to limit the voltage transients on either bus. It is installed in a bus to maintain system voltage when the load of the bus changes.
The Mono. just method is the simplest method for Mono generation. It takes a single value and generates a finite Mono stream from it. A completion event is published after publishing the specified value: Mono.
IMHO you can relay in Spring event handlers. Matt Raible and Josh Long use that in this couple of tutorials:
Key takeaways:
@Component
class ProfileCreatedEventPublisher implements
ApplicationListener<ProfileCreatedEvent>,
Consumer<FluxSink<ProfileCreatedEvent>>
Uses an event loop to take events from a LinkedBlockingQueue
.
@Override
public void onApplicationEvent(ProfileCreatedEvent event)
Queue the events that can be published anywhere within your app.
ProfileCreatedEventPublisher
is used in ServerSentEventController
to create a Flux
of events (that can be chained with a filter
), it transforms and sends them to a web client.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With