Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Micrometer Timer to record duration of async method (returns Mono or Flux)

I'd like to use Micrometer to record the execution time of an async method when it eventually happens. Is there a recommended way to do this?

Example: Kafka Replying Template. I want to record the time it takes to actually execute the sendAndReceive call (sends a message on a request topic and receives a response on a reply topic).

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

Something like

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

won't work here; it just records the time that it takes to create the Supplier, not the actual execution time.

like image 637
Mark Avatar asked Mar 16 '18 00:03

Mark


3 Answers

You can just metrics() from Mono/Flux() (have a look at metrics() here: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html) then you can do something like

public Mono<String> sendRequest(Mono<String> request) {
    return request
        .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
        .map(pr -> {
            pr.headers()
                    .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                            "reply-topic".getBytes()));
            return pr;
        })
        .map(pr -> replyingKafkaTemplate.sendAndReceive(pr)).name("my-metricsname").metrics()

And e.g. in graphite you will see latency for this call measured (You can see more here: How to use Micrometer timer together with webflux endpoints)

like image 129
m.a.tomsik Avatar answered Nov 09 '22 05:11

m.a.tomsik


You could use reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}
like image 29
Alexander Pankin Avatar answered Nov 09 '22 05:11

Alexander Pankin


You could do something like the following:

// Mono<Something> mono = ...
Timer.Sample sample = Timer.start(Clock.SYSTEM); // or use clock of registry
return mono.doOnNext(x -> sample.stop(timer));

See here for Sample documentation: http://micrometer.io/docs/concepts#_storing_start_state_in_code_timer_sample_code

For a nicer approach you could also have a look at resilience4j they decorate the mono via transform: https://github.com/resilience4j/resilience4j/tree/master/resilience4j-reactor

like image 1
mio Avatar answered Nov 09 '22 05:11

mio