subscribeOn()
specification, but everything seems to run on the main
thread.import org.junit.jupiter.api.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
/**
* I want to construct my React pipelines during creation,
* then emit events over the lifetime of my services.
*/
public class React1Test
{
/**
* Attempt 1 - use a DirectProcessor and send items to it.
* Doesn't work though - seems to always run on the main thread.
*/
@Test
public void testReact1() throws InterruptedException
{
// Create the flux and sink.
FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
FluxSink<String> sink = fluxProcessor.sink();
// Create the pipeline.
fluxProcessor
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Time passes ... things happen ...
// Pass a few messages to the sink, emulating events.
sink.next("a");
sink.next("b");
sink.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Used down below during Flux.create().
private FluxSink<String> sink2;
/**
* Attempt 2 - use Flux.create() and its FluxSink object.
* Also seems to always run on the main thread.
*/
@Test
public void testReact2() throws InterruptedException
{
// Create the flux and sink.
Flux.<String>create(sink -> sink2 = sink)
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Pass a few messages to the sink.
sink2.next("a");
sink2.next("b");
sink2.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Show us what thread we're on.
private static void showDebugMsg(String msg)
{
System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
}
}
Output is always:
a [main]
a [main]
b [main]
b [main]
c [main]
c [main]
But what I would expect, is:
a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]
Thanks in advance.
Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.
Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.
Project Reactor is a fully non-blocking foundation with back-pressure support included. It's the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway.
A Flux object represents a reactive sequence of 0.. N items, while a Mono object represents a single-value-or-empty (0..1) result. This distinction carries a bit of semantic information into the type, indicating the rough cardinality of the asynchronous processing.
We'll need reactor-core and reactor-test: 3. Synchronous Emission The simplest way to create a Flux is Flux#generate. This method relies on a generator function to produce a sequence of items. But first, let's define a class to hold our methods illustrating the generate method: 3.1. Generator With New States
Both Flux Create and Flux Generate methods are used to programmatically generate sequences for Flux/Mono. But there are differences between these two and when to use what etc. That is what this article is about. The create method accepts a FluxSink<T> consumer.
Check the API here. We can also get the reference of the FluxSink instance and emit elements outside the create method as and when we need. it does not have to happen inside the create method. I can also use multiple threads to emit elements via my FluxSink to the downstream subscribers.
The push Method In addition to the create operator, the Flux class has another static method to emit a sequence asynchronously, namely push. This method works just like create, except that it allows only one producing thread to emit signals at a time.
You see [main]
because you're calling onNext
from the main thread.
subscribeOn
you're using is only for the subscription (when create
's lambda is triggered).
You will see elastic-*
threads logged if you use publishOn
instead of subscribeOn
.
Also, consider using Processors, storing sink
obtained from Flux.create
and similar operators as a field is discouraged.
parallel()
and runOn()
instead of subscribeOn()
to get sink.next()
to run multi-threaded.publishOn()
to coerce downstream operators to run on one different Scheduler thread.Here is my updated code:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
/**
* I want to construct my React pipelines during creation,
* then emit events over the lifetime of my services.
*/
public class React1Test
{
/**
* Version 1 - use a DirectProcessor to dynamically emit items.
*/
@Test
public void testReact1() throws InterruptedException
{
// Create the flux and sink.
FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
FluxSink<String> sink = fluxProcessor.sink();
// Create the pipeline.
fluxProcessor
.parallel()
.runOn(Schedulers.elastic())
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Time passes ... things happen ...
// Pass a few messages to the sink, emulating events.
sink.next("a");
sink.next("b");
sink.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Used down below during Flux.create().
private FluxSink<String> sink2;
/**
* Version 2 - use Flux.create() and its FluxSink object.
*/
@Test
public void testReact2() throws InterruptedException
{
// Create the flux and sink.
Flux.<String>create(sink -> sink2 = sink)
.parallel()
.runOn(Schedulers.elastic())
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
// Give the multi-thread pipeline a second.
Thread.sleep(1000);
// Pass a few messages to the sink.
sink2.next("a");
sink2.next("b");
sink2.next("c");
// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}
// Show us what thread we're on.
private static void showDebugMsg(String msg)
{
System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
}
}
Both versions produce the desired multi-threaded output:
a [elastic-2]
b [elastic-3]
c [elastic-4]
b [elastic-3]
a [elastic-2]
c [elastic-4]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With