Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass results between chained observables

Abstract problem: Every time a source Observable emits and event, a sequence of API calls and Angular services need to be triggered. Some of those invocations are depending on previous results.

In my example, the source Observable startUpload$ triggers a series of depending invocations.

Using destructuring this can be written like this:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()

It almost reads like an imperative approach. But it has certain problems:

  • The destructuring chain is repeated over the code and gets longer and longer { event, headers, id, pdfId, cloudId }
  • Methods (like generateUploadId(event, headers)) are required to receive all previous values so that they are able to pass them to the next pipe, even if the method itself doesn't require it
  • Inner Observables (within the methods) are required to map the values so that further pipe stages can destruct them:

_

private closePdf(cloudId, event, headers, id, pdfId) {
    return this.httpClient.post(..., { headers } )
        .pipe(
             //...,
             map(() => ({ event, headers, id, pdfId, cloudId }))
        )
}

It would be nice if the compiler could take care of the boilerplate (like with async await) to write the code that reads like this (with none of the problems mentioned above):

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  }

How to pass results between chained observables without the problems i've mentioned? Is there a rxjs concept i've missed?

like image 843
d0x Avatar asked Aug 13 '20 00:08

d0x


3 Answers

You certainly shouldn't have your methods take in params that don't concern them!

To your main question:

How to pass results between chained observables without the problems i've mentioned?

Use a single scope (nested pipes)

The code below is equivalent to your sample code, with no need to pass the unnecessary properties. The previously returned values are accessible by function calls further down the chain:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

Notice how event and headers are accessible downstream. They do not need to be passed into functions that don't require them.

Is there a rxjs concept i've missed?

Maybe.? Not really... :-)

The trick is to tack on a .pipe to effectively group operators so they all have access to the input params.

Usually, we try to keep the code flat inside the .pipe:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );

but that code is really no different than:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

But, in the second case, line #4 has access to the name and the id, whereas in the first case it only has access to name.

Notice the signature of the first is userId$.pipe(switchMap(), map(), map(), tap())

The second is: userId$.pipe(switchMap(), tap()).

like image 76
BizzyBob Avatar answered Oct 28 '22 01:10

BizzyBob


Your methods definitely shouldn't be coupled to the context as well as not to think about mapping result to the specific shape.

RxJS is all about functional programming. And in functional programming there is a pattern like Adapting Arguments to Parametersref

It allows us to decouple methods signature from context.

In order to achieve this you can write context depending version of map, contentMap, mergMap operators so that the final solution looks like:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

Note _ after those operators.

Stackblitz Example

The goal of those custom operators if to take parameters object go through projection function and add result of projection to the original parameters object.

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
  return map(gatherParams(project, key));
}

function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return concatMap(gatherParamsOperator(projection, key));
}

function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return mergeMap(gatherParamsOperator(projection, key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  };
}

function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
  return (value: V) => ({ [key]: value } as Record<K, V>);
}

I used function overloads here because somethimes we don't need to add additional key to parameters. Parameters should only pass through it in case of this.closePdf(...) method.

As a result you're getting decoupled version of the same you had before with type safety:

enter image description here

Doesn't it look like over-engineering?

In most cases you should follow YAGNI(You aren't gonna need it) principle. And it would be better not to add more complexity to existing code. For such scenario you should stick to some simple implementation of sharing parameters between operators as follows:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => {
    console.log(params)
  });

where Params type is:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}

Please do note parentheses I used in assignments (params.cloudId = cloudId).

Stackblitz Example


There are also lots of other methods but they require to change your flow of using rxjs operators:

  • https://medium.com/@snorredanielsen/rxjs-accessing-a-previous-value-further-down-the-pipe-chain-b881026701c1
like image 11
yurzui Avatar answered Oct 28 '22 03:10

yurzui


You can:

  • assign the result of each action to an observable

  • chain subsequent function calls based on earlier results

  • those results can be reused in later action calls via withLatestFrom

  • shareReplay is used to prevent the later withLatestFrom subscriptions causing earlier functions to re-execute

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    

As above, the functions only take the parameters they require and there is no pass-through.

Demo: https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

This pattern can be simplified by use of an rxjs custom operator(note this could be refined further, including typing):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}

Which can be used like:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

Demo: https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

like image 5
wlf Avatar answered Oct 28 '22 03:10

wlf