I have two kinds of Webflux applications, annotation-based and route-based. These applications are called with a set of headers, some of which (Open Tracing) I need to propagate in downstream calls using WebClient
.
If these were normal Spring WebMvc applications I would use a Filter to keep the selected headers in a ThreadLocal
, access it in a RestTemplate
interceptor to send them to subsequent services and clear the ThreadLocal
.
What's the proper way to replicate this behaviour in WebFlux applications?
WebClient is part of Spring 5's reactive web framework called Spring WebFlux. To use WebClient, you need to include the spring-webflux module in your project. Go to http://start.spring.io.
Method SummarySet the media type of the body, as specified by the Content-Type header. Add a cookie with the given name and value. Copy the given cookies into the entity's cookies map. Perform the request without a request body.
It can take time to get used to Reactive APIs, but the WebClient has interesting features and can also be used in traditional Spring MVC applications. You can use WebClient to communicate with non-reactive, blocking services, too.
I solved it using Project Reactor's Context to store the headers in a WebFilter. Then they are gotten in the WebClient's ExchangeFilterFunction. Here's the whole solution:
WebFilter
class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return chain.filter(exchange)
.subscriberContext { ctx ->
var updatedContext = ctx
exchange.request.headers.forEach {
if (openTracingHeaders.contains(it.key.toLowerCase())) {
logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
updatedContext = updatedContext.put(it.key, it.value[0])
}
}
updatedContext
}
}
}
OpenTracingExchangeFilterFunction
class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
logger.debug("OpenTracingExchangeFilterFunction - filter()")
return OpenTracingClientResponseMono(request, next, headers)
}
}
OpenTracingClientResponseMono
class OpenTracingClientResponseMono(private val request: ClientRequest,
private val next: ExchangeFunction,
private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {
private val logger = LoggerFactory.getLogger(javaClass)
override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
val context = subscriber.currentContext()
val requestBuilder = ClientRequest.from(request)
requestBuilder.headers { httpHeaders ->
headersToPropagate.forEach {
if(context.hasKey(it)) {
logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
httpHeaders[it] = context.get<String>(it)
}
}
}
val mutatedRequest = requestBuilder.build()
next.exchange(mutatedRequest).subscribe(subscriber)
}
}
OpenTracingConfiguration
@Configuration
class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {
@Bean
fun webClient(): WebClient {
return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
}
@Bean
fun openTracingFilter(): WebFilter {
return OpenTracingFilter(openTracingConfigurationProperties.headers)
}
@Bean
fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
}
}
OpenTracingConfigurationProperties
@Configuration
@ConfigurationProperties("opentracing")
class OpenTracingConfigurationProperties {
lateinit var headers: Set<String>
}
application.yml
opentracing:
headers:
- x-request-id
- x-b3-traceid
- x-b3-spanid
- x-b3-parentspanid
- x-b3-sampled
- x-b3-flags
- x-ot-span-context
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With