Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use AsyncLocalStorage for an Observable?

I'd like to use use AsyncLocalStorage in a NestJs Interceptor:

export interface CallHandler<T = any> {
    handle(): Observable<T>;
}
export interface NestInterceptor<T = any, R = any> {
    intercept(context: ExecutionContext, next: CallHandler<T>): Observable<R> | Promise<Observable<R>>;
}

The interceptor function gets a next CallHandler that returns an Observable.

I cannot use run in this case (the run callback will exit immediately before the callHandler.handle() observable has finished):

  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const asyncLocalStorage = new AsyncLocalStorage();
    const myStore = {  some: 'data'};
    return asyncLocalStorage.run(myStore, () => callHandler.handle());
  }

See broken replit-example

The solution I came up with is this:

const localStorage = new AsyncLocalStorage();

export class MyInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, callHandler: CallHandler): Observable<any> | Promise<Observable<any>> {
    const resource = new AsyncResource('AsyncLocalStorage', { requireManualDestroy: true });
    const myStore = { some: 'data' };

    localStorage.enterWith(myStore);
    return callHandler.handle().pipe(
      finalize(() => resource.emitDestroy())
    );
  }
}

See working replit example

This seems to work fine, but I am not sure if this is really correct - and it looks messy and error-prone. So I wonder:

  1. Is this correct at all?
  2. Is there a better/cleaner way to handle this?
like image 288
TmTron Avatar asked Nov 26 '22 22:11

TmTron


1 Answers

Below is the solution I came up with. My understanding of the problem is that you need the run function to receive a callback function that will fully encapsulate the execution of the handler, however, the intercept function is expected to return an observable that has not yet been triggered. This means that if you encapsulate the observable itself in the run callback function, it will not have been triggered yet.

My solution, below, is to return a new observable that, when triggered, will be responsible for triggering (i.e. subscribing to) the call handler itself. As a result, the promise we create in the run call can fully encapsulate the handle function and it's async callbacks.

Here is the general functionality in a stand-alone function so that you can see it all together:

intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
    return new Observable((subscribe) => {
        asyncStorage.run({}, () => new Promise(resolve => {
            next.handle().subscribe(
                result => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                error => {
                    subscribe.error(err);
                    resolve();
                }
            );
        }));
    });
}

Next, I took that concept and integrated it into my interceptor below.

export class RequestContextInterceptor implements NestInterceptor {
    constructor(
        private readonly requestContext: RequestContext,
        private readonly localStorage: AsyncLocalStorage<RequestContextData>
    ) {}

    intercept(context: ExecutionContext, next: CallHandler<any>): Observable<any> {
        const contextData = this.requestContext.buildContextData(context);
        return new Observable((subscribe) => {
            void this.localStorage.run(contextData, () => this.runHandler(next, subscribe));
        });
    }

    private runHandler(next: CallHandler<any>, subscribe: Subscriber<any>): Promise<void> {
        return new Promise<void>((resolve) => {
            next.handle().subscribe(
                (result) => {
                    subscribe.next(result);
                    subscribe.complete();
                    resolve();
                },
                (err) => {
                    subscribe.error(err);
                    resolve();
                }
            );
        });
    }
}

It's worth noting that the Promise that is created during the run call does not have a rejection path. This is intentional. The error is passed on to the observable that is wrapping the promise. This means that the outer observable will still succeed or error depending upon what the inner observable does, however, the promise that wraps the inner observable will always resolve regardless.

like image 181
ajb Avatar answered Dec 05 '22 14:12

ajb