I'd like to use use AsyncLocalStorage in a NestJs Interceptor:
export interface CallHandler<T = any> {
handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}
The interceptor function gets a next
CallHandler
that returns an Observable
.
I cannot use run in this case (the run callback will exit immediately before the callHandler.handle()
observable has finished):
intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
const asyncLocalStorage = new AsyncLocalStorage();
const myStore = { some: 'data'};
return asyncLocalStorage.run(myStore, () => callHandler.handle());
}
See broken replit-example
The solution I came up with is this:
const localStorage = new AsyncLocalStorage();
export class MyInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
const myStore = { some: 'data' };
localStorage.enterWith(myStore);
return callHandler.handle().pipe(
finalize(() => resource.emitDestroy())
);
}
}
See working replit example
This seems to work fine, but I am not sure if this is really correct - and it looks messy and error-prone. So I wonder:
Below is the solution I came up with. My understanding of the problem is that you need the run
function to receive a callback function that will fully encapsulate the execution of the handler, however, the intercept
function is expected to return an observable that has not yet been triggered. This means that if you encapsulate the observable itself in the run
callback function, it will not have been triggered yet.
My solution, below, is to return a new observable that, when triggered, will be responsible for triggering (i.e. subscribing to) the call handler itself. As a result, the promise we create in the run
call can fully encapsulate the handle function and it's async callbacks.
Here is the general functionality in a stand-alone function so that you can see it all together:
intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
return new Observable((subscribe) => {
asyncStorage.run({}, () => new Promise(resolve => {
next.handle().subscribe(
result => {
subscribe.next(result);
subscribe.complete();
resolve();
},
error => {
subscribe.error(err);
resolve();
}
);
}));
});
}
Next, I took that concept and integrated it into my interceptor below.
export class RequestContextInterceptor implements NestInterceptor {
constructor(
private readonly requestContext: RequestContext,
private readonly localStorage: AsyncLocalStorage<RequestContextData>
) {}
intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
const contextData = this.requestContext.buildContextData(context);
return new Observable((subscribe) => {
void this.localStorage.run(contextData, () => this.runHandler(next, subscribe));
});
}
private runHandler(next: CallHandler<any>, subscribe: Subscriber<any>): Promise<void> {
return new Promise<void>((resolve) => {
next.handle().subscribe(
(result) => {
subscribe.next(result);
subscribe.complete();
resolve();
},
(err) => {
subscribe.error(err);
resolve();
}
);
});
}
}
It's worth noting that the Promise
that is created during the run
call does not have a rejection path. This is intentional. The error is passed on to the observable that is wrapping the promise. This means that the outer observable will still succeed or error depending upon what the inner observable does, however, the promise that wraps the inner observable will always resolve regardless.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With