Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

EventBus in Reactor 3.x

I know that EventBus is deprecated in Reactor3.x, and the suggested solution is ReplayProcessor. I have read https://github.com/reactor/reactor-core/issues/375. But the code here is too draft. I created a demo project to prove the idea here. Can someone give some comments?

======== Application.java

package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {

    private static final int NUMBER_OF_QUOTES = 10;

    @Bean
    ReplayProcessor createReplayProcessor() {

        ReplayProcessor<MyEvent> rp = ReplayProcessor.create();

        Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));

        Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));

        interest1.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event 1 handler -> event name:" + value.getEventName());
            }

        });


        interest2.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event2 handler -> event name:" + value.getEventName());
            }
        });

        return rp;
    }

    public boolean filterInterest1(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event1")) {
            return true;
        }
        return false;
    }

    public boolean filterInterest2(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event2")) {
            return true;
        }
        return false;
    }


    @Autowired
    private Publisher publisher;

    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(NUMBER_OF_QUOTES);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publishQuotes(NUMBER_OF_QUOTES);
    }

    public static void main(String[] args) throws InterruptedException {
        ApplicationContext app = SpringApplication.run(Application.class, args);

        app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);


    }

}

==========MyEvent.java=============

package hello;

public class MyEvent {

    private String eventName = "";

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public MyEvent(String eventName) {
        this.eventName =  eventName;
    }


    public void filterInterest1(MyEvent myEvent) {

    }
}

=============Publisher.java ===========

package hello;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import reactor.core.publisher.ReplayProcessor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class Publisher {

    @Autowired
    ReplayProcessor rp;

    @Autowired
    CountDownLatch latch;

    public void publishQuotes(int numberOfQuotes) throws InterruptedException {
        long start = System.currentTimeMillis();

        rp.onNext(new MyEvent("event1"));
        rp.onNext(new MyEvent("event2"));
        rp.onNext(new MyEvent("event3"));

        long elapsed = System.currentTimeMillis() - start;

        System.out.println("Elapsed time: " + elapsed + "ms");
        System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
    }

}

The whole code is https://github.com/yigubigu/reactor-start-sample.git

like image 862
richard Avatar asked Apr 14 '17 07:04

richard


People also ask

Is Eventbus deprecated?

This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they're more capable and offer better control of threading.

What is Projectreactor?

Project Reactor is a fully non-blocking foundation with back-pressure support included. It's the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.

What is Reactor bus?

Bus reactors A bus reactor is an air core inductor, or oil filled inductor, connected between two buses or two sections of the same bus to limit the voltage transients on either bus. It is installed in a bus to maintain system voltage when the load of the bus changes.

What is mono just?

The Mono. just method is the simplest method for Mono generation. It takes a single value and generates a finite Mono stream from it. A completion event is published after publishing the specified value: Mono.


Video Answer


1 Answers

IMHO you can relay in Spring event handlers. Matt Raible and Josh Long use that in this couple of tutorials:

  • https://developer.okta.com/blog/2018/09/24/reactive-apis-with-spring-webflux
  • https://developer.okta.com/blog/2018/09/25/spring-webflux-websockets-react

Key takeaways:

@Component class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>>

Uses an event loop to take events from a LinkedBlockingQueue.

@Override public void onApplicationEvent(ProfileCreatedEvent event)

Queue the events that can be published anywhere within your app.

ProfileCreatedEventPublisher is used in ServerSentEventController to create a Flux of events (that can be chained with a filter), it transforms and sends them to a web client.

like image 112
rvazquezglez Avatar answered Sep 28 '22 18:09

rvazquezglez