Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create an extra trace representing the "time spent inside Kafka" using Spring Kafka / OpenTelemetry

I'm working with a Spring Boot Kafka consumer that uses OpenTelemetry for tracing.

Currently, I have a single span that covers the entire message processing, but I want to create a separate, an extra span, dedicated span specifically for measuring the time the message was inside Kafka before being processed.

Here's my current implementation:

package org.example;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.api.OpenTelemetry;
import org.springframework.beans.factory.annotation.Autowired;

import java.time.Instant;
import java.util.Map;
import java.util.HashMap;

@Service
public class ConsumerService {

    @Autowired
    private Tracer tracer;

    @Autowired
    private OpenTelemetry openTelemetry;

    @KafkaListener(topics = "sample-topic", groupId = "group-2")
    public void consumeLinksWithRecord(ConsumerRecord<String, String> record) {
        String word = record.value();

        String traceparent = record.headers().lastHeader("traceparent") != null
                ? new String(record.headers().lastHeader("traceparent").value())
                : "No traceparentid header found";

        // Create a span for Kafka message processing with ConsumerRecord
        Span span = tracer.spanBuilder("inside-kafka")
                .setAttribute("service.name", "insidekafkaservice")
                .setAttribute("kafka.topic", record.topic())
                .setAttribute("kafka.partition", record.partition())
                .setAttribute("kafka.offset", record.offset())
                .setAttribute("kafka.key", record.key() != null ? record.key() : "null")
                .setAttribute("kafka.message", word)
                .setAttribute("kafka.traceparent", traceparent)
                .setStartTimestamp(Instant.ofEpochMilli(record.timestamp()))
                .startSpan();

        try (var scope = span.makeCurrent()) {

                extractTraceContext(traceparent, span);

            System.out.println("Received Message: " + word + " from partition: " + record.partition());
            System.out.println("Trace Parent ID: " + traceparent);

            // Add custom attributes to the span
            span.setAttribute("message.length", word.length());
            span.setAttribute("header.count", record.headers().spliterator().getExactSizeIfKnown());

            try {
                Thread.sleep(80);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                span.setStatus(StatusCode.ERROR, "Processing interrupted");
                return;
            }
            
            span.setStatus(StatusCode.OK);
            
        } catch (Exception e) {
            span.setStatus(StatusCode.ERROR, e.getMessage());
            span.recordException(e);
            throw e;
        } finally {
            span.end();
        }
    }

    private void extractTraceContext(String traceparentId, Span span) {
        try {
            // Create a carrier map with the traceparentid
            Map<String, String> carrier = new HashMap<>();
            carrier.put("traceparent", traceparentId);
            
            // Extract the context using the W3C trace context propagator
            TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
            Context extractedContext = propagator.extract(Context.current(), carrier, new TextMapGetter<Map<String, String>>() {
                @Override
                public String get(Map<String, String> carrier, String key) {
                    return carrier.get(key);
                }
                
                @Override
                public Iterable<String> keys(Map<String, String> carrier) {
                    return carrier.keySet();
                }
            });
            
            // Link the extracted context to the current span
            if (extractedContext != Context.current()) {
                span.addLink(Span.fromContext(extractedContext).getSpanContext());
            }
            
        } catch (Exception e) {
            // Log the error but don't fail the processing
            System.err.println("Failed to extract trace context: " + e.getMessage());
        }
    }
}

For instance, if:

  • the producer starts their logic at 00:00
  • the producer finishes their logic and puts the message inside Kafka at 00:01
  • the consumer picked up the message at 00:04
  • the consumer finished their business logic on the message at 00:05

that would mean the message stayed inside Kafka from 00:01 to 00:04.

I would expect to see traces like this:

enter image description here

But instead, I am currently seeing this:

enter image description here

How to create the extra trace representing the time spent in Kafka?

like image 867
PatPatPat Avatar asked Dec 07 '25 03:12

PatPatPat


1 Answers

@Service
public class ConsumerService {

    @Autowired
    private Tracer tracer;

    @Autowired
    private OpenTelemetry openTelemetry;

    @KafkaListener(topics = "sample-topic", groupId = "group-2")
    public void consume(ConsumerRecord<String, String> record) {
        String message = record.value();
        long kafkaTimestamp = record.timestamp();
        long now = System.currentTimeMillis();

        String traceparent = getHeader(record, "traceparent");
        Context parentContext = extractTraceContext(traceparent);

        Span kafkaDelaySpan = tracer.spanBuilder("kafka.queue.delay")
                .setParent(parentContext)
                .setStartTimestamp(Instant.ofEpochMilli(kafkaTimestamp))
                .startSpan();
        kafkaDelaySpan.setAttribute("service.name", "kafka-delay-service"); //added
        kafkaDelaySpan.setAttribute("kafka.topic", record.topic());
        kafkaDelaySpan.setAttribute("kafka.partition", record.partition());
        kafkaDelaySpan.setAttribute("kafka.offset", record.offset());
        kafkaDelaySpan.end(Instant.ofEpochMilli(now));

        Span processingSpan = tracer.spanBuilder("kafka.consumer.process")
                .setParent(parentContext)
                .startSpan();

        processingSpan.setAttribute("service.name", "consumer-service");  //added

        try (Scope scope = processingSpan.makeCurrent()) {
            Thread.sleep(80);
            processingSpan.setAttribute("message.length", message.length());
            processingSpan.setStatus(StatusCode.OK);
        } catch (Exception e) {
            processingSpan.setStatus(StatusCode.ERROR, e.getMessage());
            processingSpan.recordException(e);
        } finally {
            processingSpan.end();
        }
    }

    private String getHeader(ConsumerRecord<String, String> record, String key) {
        if (record.headers().lastHeader(key) != null) {
            return new String(record.headers().lastHeader(key).value());
        }
        return "";
    }

    private Context extractTraceContext(String traceparentId) {
        try {
            Map<String, String> carrier = new HashMap<>();
            carrier.put("traceparent", traceparentId);

            TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
            return propagator.extract(Context.current(), carrier, new TextMapGetter<>() {
                @Override
                public String get(Map<String, String> carrier, String key) {
                    return carrier.get(key);
                }

                @Override
                public Iterable<String> keys(Map<String, String> carrier) {
                    return carrier.keySet();
                }
            });
        } catch (Exception e) {
            return Context.current();
        }
    }
}
like image 117
Idris Olokunola Avatar answered Dec 08 '25 18:12

Idris Olokunola



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!