In this post, we’ll look at how to combine the MDC feature of current logging framework
with Reactor's own Context
, in order to make contextualized logging possible within
a reactive application.
⚠️ update: see the bottom note for a bug with doOnEach to be aware of in 3.1.3.RELEASE
and below
The need
Let’s imagine that we have an API that lists restaurants in your area, under a requested price. It simply interrogates a database and logs the found restaurants in the console.
Only problem is, whenever two customers request the API at the same time, sometimes their logs are mixed and you don’t have a way to find out who got what
To simulate such a situation we can issue two HTTPie
[1] calls:
$ http :8080/byPrice maxPrice==12 & http :8080/byPrice maxPrice==11
Usually, one solution would be to use the MDC
feature of logging frameworks like Logback
(SLF4J has a MDC abstraction that will delegate to the equivalent in the underlying
framework if it has such a feature). You can put key-value pairs in the MDC and
refer to the keys in your logging appender’s pattern, and the values will magically
appear in the logs.
For instance, to solve our logging issue we could ask that every user puts its userId in an HTTP request header, and then use that id as a differentiator in the MDC:
logging.pattern.console=%magenta([%thread]) %cyan([%X{apiID}]) - %highlight(%-5level) %logger{36}.%M - %msg%n (1)
1 | notice the %X{apiID} , that’s our MDC key |
The issue
This is all good and well in eg. a Spring MVC app, where the Servlet model results
in 1 Thread
per request, because the MDC implementations are based on ThreadLocal
.
Uh-oh. If you’ve done some reactive programming before, you’ll probably start to feel a bit uneasy at this point 😅
In a reactive programming library like Reactor, ThreadLocal
is not ideal because
execution of an asynchronous pipeline can easily hop threads, and threads can also
easily be shared between unrelated pipelines.
If we implement the restaurant API as a Spring WebFlux application like this:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice,
@RequestHeader(required = false, name = "X-UserId") String uid) { (1)
String apiId = uid == null ? "" : uid;
MDC.put("apiID", apiId); (2)
return restaurantService.byPrice(maxPrice)
.doOnNext(r -> LOG.debug("found restaurant {} for ${}", r.getName(), r.getPricePerPerson())); (3)
}
1 | We get the user id from the headers |
2 | We attempt to put it in the MDC… |
3 | …and log the results |
HTTPie
:$ http :8080/byPrice maxPrice==12 X-UserId:foo & http :8080/byPrice maxPrice==11 X-UserId:bar
But we get the following (disappointing) logs:
[reactor-http-nio-6] [bar] - DEBUG apiIdLogger.byPrice - finding restaurants for under $11.0 for %bar [reactor-http-nio-7] [foo] - DEBUG apiIdLogger.byPrice - finding restaurants for under $12.0 for %foo [byPrice-1] [] - DEBUG apiIdLogger.lambda$byPrice$3 - found restaurant Burger for $11.0 [byPrice-1] [] - DEBUG apiIdLogger.lambda$byPrice$3 - found restaurant DoMONOs for $10.0 [byPrice-1] [] - DEBUG apiIdLogger.lambda$byPrice$3 - found restaurant Cheesecake ProxyFactory for $9.5 [byPrice-1] [] - DEBUG apiIdLogger.lambda$byPrice$3 - found restaurant DoMONOs for $10.0 [byPrice-1] [] - DEBUG apiIdLogger.lambda$byPrice$3 - found restaurant Cheesecake ProxyFactory for $9.5
Oh snap!
-
The MDC was taken into account for the first log statement but not the others 🤔
-
The service seems to offset the work on another thread 😞
-
The work is done on a single thread, different from the one where user ids are logged 💥
-
Worse, it’s the same thread for both requests 💥😱💥
The solution
Reactor comes with a Context
in which one can put data to be associated with a particular
Flux
rather than a Thread
. Maybe we can somehow use this 🤔
In reactive terms, log statements are usually considered as something to be
executed on the side of the pipeline, since it doesn’t change the sequence’s
data. We recommend to isolate these "side effects" in the doOn*
family of operators.
One such operator is a particularly good candidate for our contextual logging need:
doOnEach
let you consume a Signal
object that materialize each reactive events,
onNext
data events and onComplete
/onError
terminal events alike.
What’s more interesting is that since version 3.1.3
, it also provides access
to the current Context
👍 🎉
So the idea would be to isolate log statements in doOnEach
and write a bit of
boilerplate code [2]
to fill the MDC, do the logging and clear the MDC (we want to be
good citizens here):
private static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
return signal -> {
if (!signal.isOnNext()) return; (1)
Optional<String> apiIDMaybe = signal.getContext().getOrEmpty("apiID"); (2)
apiIDMaybe.ifPresentOrElse(apiID -> {
try (MDC.MDCCloseable closeable = MDC.putCloseable("apiID", apiID)) { (3)
logStatement.accept(signal.get()); (4)
}
}, () -> logStatement.accept(signal.get())); (4)
}
}
1 | We only care to log for onNext events |
2 | We’ll extract the interesting value from the Reactor Context |
3 | If present, we’ll put it in the MDC and automatically clear it once the line below is done |
4 | We execute the log statement provided as a consumer of the onNext value |
With that boilerplate logging function, we can rewrite our controller to reactively log the request results:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId;
(3)
return restaurantService.byPrice(maxPrice))
.doOnEach(logOnNext(sig, r -> LOG.debug("found restaurant {} for ${}", r.getName(), r.getPricePerPerson()))) (2)
.subscriberContext(Context.of("apiID", apiId)); (1)
}
1 | The subscriberContext method is used at the bottom of the chain to enrich the Context (here with apiId ) |
2 | Every operator above it will see the Context , including our boilerplate logging doOnEach |
3 | ⚠️ something is missing here. |
The missing part above is the initial log. We could put the value in the MDC and
then call the LOG
directly like before, but let’s be reactive all the way:
@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
String apiId = userId == null ? "" : userId;
return Mono.just(String.format("finding restaurants for under $%.2f for %s", maxPrice, apiId)) (1)
.doOnEach(logOnNext(sig, msg -> LOG.debug(msg))) (2)
.thenMany(restaurantService.byPrice(maxPrice)) (3)
.doOnEach(logOnNext(sig, r -> LOG.debug("found restaurant {} for ${}", r.getName(), r.getPricePerPerson())))
.subscriberContext(Context.of("apiID", apiId));
}
1 | Generate an initial message as the starting point |
2 | Use our boilerplate logging method (doOnNext sees our prepared message as the value to log) |
3 | then replace and continue the sequence with the Many elements from the restaurantService call using thenMany |
And voilà the result:
[reactor-http-nio-6] [bar] - DEBUG apiIdLogger.lambda$byPrice$3 - finding restaurants for under $11.00 for bar [reactor-http-nio-7] [foo] - DEBUG apiIdLogger.lambda$byPrice$3 - finding restaurants for under $12.00 for foo [byPrice-1] [foo] - DEBUG apiIdLogger.lambda$byPrice$5 - found restaurant Burger for $11.0 [byPrice-1] [bar] - DEBUG apiIdLogger.lambda$byPrice$5 - found restaurant DoMONOs for $10.0 [byPrice-1] [bar] - DEBUG apiIdLogger.lambda$byPrice$5 - found restaurant Cheesecake ProxyFactory for $9.5 [byPrice-1] [foo] - DEBUG apiIdLogger.lambda$byPrice$5 - found restaurant DoMONOs for $10.0 [byPrice-1] [foo] - DEBUG apiIdLogger.lambda$byPrice$5 - found restaurant Cheesecake ProxyFactory for $9.5
Results from two requests interleaved in logs from the same Thread
, but we can
still identify who got what. WIN 😁
Following the writing of this post, an issue with doOnEach
was discovered that leads to the
operator being bypassed in some configurations (asynchronous fusion). This will be fixed in
3.1.4.RELEASE
, and in the meantime you can put a .hide()
just before the .doOnEach(…)
.
See https://github.com/reactor/reactor-core/issues/1056