Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to create a Spring Reactor Flux from a ActiveMQ queue?

I am experimenting with the Spring Reactor 3 components and Spring Integration to create a reactive stream (Flux) from a JMS queue.

I am attempting to create a reactive stream (Spring Reactor 3 Flux) from a JMS queue (ActiveMQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly but the client does not receive any of the JMS messages until the server is stopped. Then all of the messages get "pushed" to the client a once.

Any help would be appreciated.

Here is the configuration file that I am using to configure the JMS, Integration components and the reactive publisher:

public class JmsConfiguration {

    private String defaultBrokerUrl;

    private String patientQueue;

    MessageListenerAdapter messageListenerAdapter;

    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;

    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);


    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        return connectionFactory;

    // Set the jackson message converter
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        return template;

    public MessageListenerAdapter messageListenerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        return messageListenerAdapter;

    public AbstractMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
            public void handleError(Throwable t) {

        return defaultMessageListenerContainer;

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;

    public MessageChannel jmsOutboundInboundReplyChannel() {
        return MessageChannels.queue().get();

    public Publisher<Message<String>> pollableReactiveFlow() {
        return IntegrationFlows

    public MessageChannel jmsChannel() {
        return new DirectChannel();

The controller that creates the Flux is:

public class PatientChangePushController {
    private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
    private int durationInSeconds = 30;
    private Patient patient;
    AtomicReference<SignalType> checkFinally = new AtomicReference<>();

    PatientService patientService;

    Publisher<Message<String>> pollableReactiveFlow;

    private JmsTemplate jmsTemplate;

    private Queue patientQueue;

     * Subscribe to a Flux of a patient that has been updated.
     * @param id
     * @return
    @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {

        Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
        return messageFlux;

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (long i = 0L; i < 100; i++) {
            Patient patient = new Patient();
            System.out.println("Message was sent to the Queue");


    void send(Patient patient) {
        this.jmsTemplate.convertAndSend(this.patientQueue, patient);


If anyone can tell me why the messages do not get sent to the client until after the server is killed, I would appreciate it.

like image 846
T. Nash Avatar asked Mar 30 '17 19:03

T. Nash

1 Answers

Works well for me:

public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);

    private ConnectionFactory connectionFactory;

    private JmsTemplate jmsTemplate;

    public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getPatientAlerts() {
        return Flux.from(jmsReactiveSource())

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (int i = 0; i < 100; i++) {
            this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));


In one terminal I have curl http://localhost:8080/events which waits for SSEs from that Flux.

In other terminal I perform curl http://localhost:8080/generate and see in the first one:

data:testMessage #1

data:testMessage #2

data:testMessage #3

data:testMessage #4

I use Spring Boot 2.0.0.BUILD-SNAPSHOT.

Also see here: https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sse

like image 166
Artem Bilan Avatar answered Nov 11 '22 01:11

Artem Bilan