I'm working with a Spring Boot Kafka consumer that uses OpenTelemetry for tracing.
Currently, I have a single span that covers the entire message processing, but I want to create a separate, an extra span, dedicated span specifically for measuring the time the message was inside Kafka before being processed.
Here's my current implementation:
package org.example;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.api.OpenTelemetry;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
@Service
public class ConsumerService {
@Autowired
private Tracer tracer;
@Autowired
private OpenTelemetry openTelemetry;
@KafkaListener(topics = "sample-topic", groupId = "group-2")
public void consumeLinksWithRecord(ConsumerRecord<String, String> record) {
String word = record.value();
String traceparent = record.headers().lastHeader("traceparent") != null
? new String(record.headers().lastHeader("traceparent").value())
: "No traceparentid header found";
// Create a span for Kafka message processing with ConsumerRecord
Span span = tracer.spanBuilder("inside-kafka")
.setAttribute("service.name", "insidekafkaservice")
.setAttribute("kafka.topic", record.topic())
.setAttribute("kafka.partition", record.partition())
.setAttribute("kafka.offset", record.offset())
.setAttribute("kafka.key", record.key() != null ? record.key() : "null")
.setAttribute("kafka.message", word)
.setAttribute("kafka.traceparent", traceparent)
.setStartTimestamp(Instant.ofEpochMilli(record.timestamp()))
.startSpan();
try (var scope = span.makeCurrent()) {
extractTraceContext(traceparent, span);
System.out.println("Received Message: " + word + " from partition: " + record.partition());
System.out.println("Trace Parent ID: " + traceparent);
// Add custom attributes to the span
span.setAttribute("message.length", word.length());
span.setAttribute("header.count", record.headers().spliterator().getExactSizeIfKnown());
try {
Thread.sleep(80);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
span.setStatus(StatusCode.ERROR, "Processing interrupted");
return;
}
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private void extractTraceContext(String traceparentId, Span span) {
try {
// Create a carrier map with the traceparentid
Map<String, String> carrier = new HashMap<>();
carrier.put("traceparent", traceparentId);
// Extract the context using the W3C trace context propagator
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
Context extractedContext = propagator.extract(Context.current(), carrier, new TextMapGetter<Map<String, String>>() {
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}
});
// Link the extracted context to the current span
if (extractedContext != Context.current()) {
span.addLink(Span.fromContext(extractedContext).getSpanContext());
}
} catch (Exception e) {
// Log the error but don't fail the processing
System.err.println("Failed to extract trace context: " + e.getMessage());
}
}
}
For instance, if:
that would mean the message stayed inside Kafka from 00:01 to 00:04.
I would expect to see traces like this:

But instead, I am currently seeing this:

How to create the extra trace representing the time spent in Kafka?
@Service
public class ConsumerService {
@Autowired
private Tracer tracer;
@Autowired
private OpenTelemetry openTelemetry;
@KafkaListener(topics = "sample-topic", groupId = "group-2")
public void consume(ConsumerRecord<String, String> record) {
String message = record.value();
long kafkaTimestamp = record.timestamp();
long now = System.currentTimeMillis();
String traceparent = getHeader(record, "traceparent");
Context parentContext = extractTraceContext(traceparent);
Span kafkaDelaySpan = tracer.spanBuilder("kafka.queue.delay")
.setParent(parentContext)
.setStartTimestamp(Instant.ofEpochMilli(kafkaTimestamp))
.startSpan();
kafkaDelaySpan.setAttribute("service.name", "kafka-delay-service"); //added
kafkaDelaySpan.setAttribute("kafka.topic", record.topic());
kafkaDelaySpan.setAttribute("kafka.partition", record.partition());
kafkaDelaySpan.setAttribute("kafka.offset", record.offset());
kafkaDelaySpan.end(Instant.ofEpochMilli(now));
Span processingSpan = tracer.spanBuilder("kafka.consumer.process")
.setParent(parentContext)
.startSpan();
processingSpan.setAttribute("service.name", "consumer-service"); //added
try (Scope scope = processingSpan.makeCurrent()) {
Thread.sleep(80);
processingSpan.setAttribute("message.length", message.length());
processingSpan.setStatus(StatusCode.OK);
} catch (Exception e) {
processingSpan.setStatus(StatusCode.ERROR, e.getMessage());
processingSpan.recordException(e);
} finally {
processingSpan.end();
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
if (record.headers().lastHeader(key) != null) {
return new String(record.headers().lastHeader(key).value());
}
return "";
}
private Context extractTraceContext(String traceparentId) {
try {
Map<String, String> carrier = new HashMap<>();
carrier.put("traceparent", traceparentId);
TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator();
return propagator.extract(Context.current(), carrier, new TextMapGetter<>() {
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}
});
} catch (Exception e) {
return Context.current();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With