Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Migration guide for Schedulers.enableMetrics() in project Reactor

I noticed that Schedulers.enableMetrics() got deprecated but I don't know what I should I do to get all my schedulers metered in a typical use case (using Spring Boot application).

Javadoc suggests using timedScheduler but how should it be achieved for Spring Boot?

like image 904
pixel Avatar asked Oct 18 '25 15:10

pixel


1 Answers

First off, here are my thoughts on why the Schedulers.enableMetrics() approach was deprecated:


The previous approach was flawed in several ways:

  • intrinsic dependency on the MeterRegistry#globalRegistry() without any way of using a different registry.
  • wrong level of abstraction and limited instrumentation:
    • it was not the schedulers themselves that were instrumented, but individual ExecutorService instances assumed to back the schedulers.
    • schedulers NOT backed by any ExecutorService couldn't be instrumented.
    • schedulers backed by MULTIPLE ExecutorService (eg. a pool of workers) would produce multiple levels of metrics difficult to aggregate.
    • instrumentation was all-or-nothing, potentially polluting metrics backend with metrics from global or irrelevant schedulers.

A deliberate constraint of the new approach is that each Scheduler must be explicitly wrapped, which ensures that the correct MeterRegistry is used and that metrics are recognizable and aggregated for that particular Scheduler (thanks to the mandatory metricsPrefix).


I'm not a Spring Boot expert, but if you really want to instrument all the schedulers including the global ones here is a naive approach that will aggregate data from all the schedulers of same category, demonstrated in a Spring Boot app:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Configuration
    static class SchedulersConfiguration {

        @Bean
        @Order(1)
        public Scheduler originalScheduler() {
            // For comparison, we can capture a new original Scheduler (which won't be disposed by setFactory, unlike the global ones)
            return Schedulers.newBoundedElastic(4, 100, "compare");
        }

        @Bean
        public SimpleMeterRegistry registry() {
            return new SimpleMeterRegistry();
        }

        @Bean
        public Schedulers.Factory instrumentedSchedulers(SimpleMeterRegistry registry) {
            // Let's create a Factory that does the same as the default Schedulers factory in Reactor-Core, but with instrumentation
            return new Schedulers.Factory() {
                @Override
                public Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory, int ttlSeconds) {
                    // The default implementation maps to the vanilla Schedulers so we can delegate to that
                    Scheduler original = Schedulers.Factory.super.newBoundedElastic(threadCap, queuedTaskCap, threadFactory, ttlSeconds);

                    // IMPORTANT NOTE: in this example _all_ the schedulers of the same type will share the same prefix/name
                    // this would especially be problematic if gauges were involved as they replace old gauges of the same name.
                    // Fortunately, for now, TimedScheduler only uses counters, timers and longTaskTimers.
                    String prefix = "my.instrumented.boundedElastic"; // TimedScheduler will add `.scheduler.xxx` to that prefix
                    return Micrometer.timedScheduler(original, registry, prefix);
                }

                @Override
                public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
                    Scheduler original = Schedulers.Factory.super.newParallel(parallelism, threadFactory);
                    String prefix = "my.instrumented.parallel"; // TimedScheduler will add `.scheduler.xxx` to that prefix
                    return Micrometer.timedScheduler(original, registry, prefix);
                }

                @Override
                public Scheduler newSingle(ThreadFactory threadFactory) {
                    Scheduler original = Schedulers.Factory.super.newSingle(threadFactory);
                    String prefix = "my.instrumented.single"; // TimedScheduler will add `.scheduler.xxx` to that prefix
                    return Micrometer.timedScheduler(original, registry, prefix);
                }
            };
        }

        @PreDestroy
        void resetFactories() {
            System.err.println("Resetting Schedulers Factory to default");
            // Later on if we want to disable instrumentation we can reset the Factory to defaults (closing all instrumented schedulers)
            Schedulers.resetFactory();
        }
    }


    @Service
    public static class Demo implements ApplicationRunner {

        final Scheduler forComparison;
        final SimpleMeterRegistry registry;
        final Schedulers.Factory factory;

        Demo(Scheduler forComparison, SimpleMeterRegistry registry, Schedulers.Factory factory) {
            this.forComparison = forComparison;
            this.registry = registry;
            this.factory = factory;

            Schedulers.setFactory(factory);
        }

        public void generateMetrics() {
            Schedulers.boundedElastic().schedule(() -> {});
            Schedulers.newBoundedElastic(4, 100, "bounded1").schedule(() -> {});
            Schedulers.newBoundedElastic(4, 100, "bounded2").schedule(() -> {});
            Micrometer.timedScheduler(
                    forComparison,
                    registry,
                    "my.custom.instrumented.bounded"
            ).schedule(() -> {});
            Schedulers.newBoundedElastic(4, 100, "bounded3").schedule(() -> {});
        }

        public String getCompletedSummary() {
            return Search.in(registry)
                    .name(n -> n.endsWith(".scheduler.tasks.completed"))
                    .timers()
                    .stream()
                    .map(c -> c.getId().getName() + "=" + c.count())
                    .collect(Collectors.joining("\n"));
        }

        @Override
        public void run(ApplicationArguments args) throws Exception {
            generateMetrics();
            System.err.println(getCompletedSummary());
        }
    }
}

Which prints:

my.instrumented.boundedElastic.scheduler.tasks.completed=4
my.custom.instrumented.bounded.scheduler.tasks.completed=1

Notice how the metrics for the four instrumentedFactory-produced Scheduler are aggregated together.

There's a bit of a hacky workaround for this: by default Schedulers uses ReactorThreadFactory, an internal private class which happens to be a Supplier<String>, supplying the "simplified name" (ie toString but without the configuration options) of the Scheduler.

One could use the following method to tentatively extract that name:

static String inferSimpleSchedulerName(ThreadFactory threadFactory, String defaultName) {
    if (!(threadFactory instanceof Supplier)) {
        return defaultName;
    }
    Object supplied = ((Supplier<?>) threadFactory).get();
    if (!(supplied instanceof String)) {
        return defaultName;
    }
    return (String) supplied;
}

Which can be applied to eg. the newParallel method in the factory:

String simplifiedName = inferSimpleSchedulerName(threadFactory, "para???");
String prefix = "my.instrumented." + simplifiedName; // TimedScheduler will add `.scheduler.xxx` to that prefix

This can then be demonstrated by submitting a few tasks to different parallel schedulers in the Demo#generateMetrics() part:

Schedulers.parallel().schedule(() -> {});
Schedulers.newParallel("paraOne").schedule(() -> {});
Schedulers.newParallel("paraTwo").schedule(() -> {});

And now it prints (blank lines for emphasis):

my.instrumented.paraOne.scheduler.tasks.completed=1
my.instrumented.paraTwo.scheduler.tasks.completed=1
my.instrumented.parallel.scheduler.tasks.completed=1

my.custom.instrumented.bounded.scheduler.tasks.completed=1
my.instrumented.boundedElastic.scheduler.tasks.completed=4
like image 192
Simon Baslé Avatar answered Oct 22 '25 08:10

Simon Baslé