Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Project Reactor, using a Flux sink outside of the creation lambda

  • When my service starts up, I want to construct a simple pipeline.
  • I'd like to isolate the Flux sink, or a Processor, to emit events with.
  • Events will be coming in from multiple threads and should be processed according to the pipeline's subscribeOn() specification, but everything seems to run on the main thread.
  • What is the best approach? I've attached my attempts below.
  • (I'm using reactor-core v3.2.8.RELEASE.)
import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
 * I want to construct my React pipelines during creation,
 * then emit events over the lifetime of my services.
 */
public class React1Test
{
    /**
     * Attempt 1 - use a DirectProcessor and send items to it.
     * Doesn't work though - seems to always run on the main thread.
     */
    @Test
    public void testReact1() throws InterruptedException
    {
        // Create the flux and sink.
        FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = fluxProcessor.sink();

        // Create the pipeline.
        fluxProcessor
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Time passes ... things happen ...
        // Pass a few messages to the sink, emulating events.
        sink.next("a");
        sink.next("b");
        sink.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Used down below during Flux.create().
    private FluxSink<String> sink2;

    /**
     * Attempt 2 - use Flux.create() and its FluxSink object.
     * Also seems to always run on the main thread.
     */
    @Test
    public void testReact2() throws InterruptedException
    {
        // Create the flux and sink.
        Flux.<String>create(sink -> sink2 = sink)
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Pass a few messages to the sink.
        sink2.next("a");
        sink2.next("b");
        sink2.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Show us what thread we're on.
    private static void showDebugMsg(String msg)
    {
        System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
    }
}

Output is always:

a [main]
a [main]
b [main]
b [main]
c [main]
c [main]

But what I would expect, is:

a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]

Thanks in advance.

like image 988
Luke Avatar asked May 09 '19 16:05

Luke


People also ask

What is sink in project reactor?

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.

What is flux in reactor?

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

What is Project reactor used for?

Project Reactor is a fully non-blocking foundation with back-pressure support included. It's the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.

What is mono and flux in reactive programming?

A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.

How to create a flux reactor?

We'll need reactor-core and reactor-test: 3. Synchronous Emission The simplest way to create a Flux is Flux#generate. This method relies on a generator function to produce a sequence of items. But first, let's define a class to hold our methods illustrating the generate method: 3.1. Generator With New States

What is the difference between flux create and flux generate methods?

Both Flux Create and Flux Generate methods are used to programmatically generate sequences for Flux/Mono. But there are differences between these two and when to use what etc. That is what this article is about. The create method accepts a FluxSink<T> consumer.

Can fluxsink emit elements outside the create method?

Check the API here. We can also get the reference of the FluxSink instance and emit elements outside the create method as and when we need. it does not have to happen inside the create method. I can also use multiple threads to emit elements via my FluxSink to the downstream subscribers.

How do you push a sequence in flux?

The push Method In addition to the create operator, the Flux class has another static method to emit a sequence asynchronously, namely push. This method works just like create, except that it allows only one producing thread to emit signals at a time.


2 Answers

You see [main] because you're calling onNext from the main thread. subscribeOn you're using is only for the subscription (when create's lambda is triggered). You will see elastic-* threads logged if you use publishOn instead of subscribeOn.

Also, consider using Processors, storing sink obtained from Flux.create and similar operators as a field is discouraged.

like image 92
bsideup Avatar answered Oct 17 '22 23:10

bsideup


  • You can use parallel() and runOn() instead of subscribeOn() to get sink.next() to run multi-threaded.
  • bsideup is also correct - you can use publishOn() to coerce downstream operators to run on one different Scheduler thread.

Here is my updated code:

import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
 * I want to construct my React pipelines during creation,
 * then emit events over the lifetime of my services.
 */
public class React1Test
{
    /**
     * Version 1 - use a DirectProcessor to dynamically emit items.
     */
    @Test
    public void testReact1() throws InterruptedException
    {
        // Create the flux and sink.
        FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = fluxProcessor.sink();

        // Create the pipeline.
        fluxProcessor
            .parallel()
            .runOn(Schedulers.elastic())
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Time passes ... things happen ...
        // Pass a few messages to the sink, emulating events.
        sink.next("a");
        sink.next("b");
        sink.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Used down below during Flux.create().
    private FluxSink<String> sink2;

    /**
     * Version 2 - use Flux.create() and its FluxSink object.
     */
    @Test
    public void testReact2() throws InterruptedException
    {
        // Create the flux and sink.
        Flux.<String>create(sink -> sink2 = sink)
            .parallel()
            .runOn(Schedulers.elastic())
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Pass a few messages to the sink.
        sink2.next("a");
        sink2.next("b");
        sink2.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Show us what thread we're on.
    private static void showDebugMsg(String msg)
    {
        System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
    }
}

Both versions produce the desired multi-threaded output:

a [elastic-2]
b [elastic-3]
c [elastic-4]
b [elastic-3]
a [elastic-2]
c [elastic-4]
like image 37
Luke Avatar answered Oct 18 '22 00:10

Luke