Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How would you return an Observable<T> from a function

I have this function:

  getData(): Observable<ServerResponse> {
    const data = {} as ServerResponse;

    data.client = this.getClientData().pipe(
      map(response =>
        response.map(x => {
          return x.data.client;
        }),
      ),
    );

    data.otherData = this.otherData().pipe(
      map(response =>
        response.map(x => {
          return this.groupByTimePeriod(x.data.other, 'day', 'week');
        }),
      ),
    );
  }

The function need to return data with all the properties that gets assigned to that inside a function. How would you do that?

If I just do return data it is does not work as this.getClientData() and this.otherData() has not yet being completed.

like image 911
sreginogemoh Avatar asked Jan 14 '19 23:01

sreginogemoh


People also ask

How to get the last emitted value from a observable?

Subject and Observable doesn't have such a thing. When a value is emitted, it is passed to its subscribers and the Observable is done with it. You may use BehaviorSubject which stores the last emitted value and emits it immediately to new subscribers. It also has a getValue() method to get the current value;

How does subscribe() function work in observable?

From the docs: The code inside Observable.create (function subscribe (observer) {...}) represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes. It also appears as though you are using pipe/map in an attempt to set a property on your data object.

How do I retrieve observable values from the source sequence?

Observable values can be retrieved from any locations. The source sequence is first pushed onto a special observer that is able to emit elsewhere. This is achieved with the Subject class from the Reactive Extensions (RxJS).

What is generators input in observable?

Generators.input is used by Observable’s viewof operator to define the current value of a view […] it can be used to define a generator cell exposing the current value of an input […]. And right at the top of the function’s documentation, we find our answer:


1 Answers

Well, you have multiple questions/problems here. I'll start with the easiest. How do you get an observable from a function/object? The answer is via observable of:

return of(data);

But you eluded to a larger problem, which is: how do you defer returning data until child observables emit their values? You are looking for forkJoin. Via the docs:

forkJoin will wait for all passed Observables to complete and then it will emit an array with last values from corresponding Observables. So if you pass n Observables to the operator, resulting array will have n values, where first value is the last thing emitted by the first Observable, second value is the last thing emitted by the second Observable and so on. That means forkJoin will not emit more than once and it will complete after that.

You also have several other problems. For example, you are never subscribing to this.getClientData() or this.otherData(). Observables are lazily executed. Code in your observable will not execute until something subscribes to it. From the docs:

The code inside Observable.create(function subscribe(observer) {...}) represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes.

It also appears as though you are using pipe/map in an attempt to set a property on your data object. But you're never setting data.client or data.other, so they will always be empty.

So, putting it all together, here's what your code might look like, with simulated server latency to show that forkJoin waits for completion of both observables:

import { Injectable } from '@angular/core';
import { Observable, of, forkJoin } from 'rxjs';
import { delay } from 'rxjs/operators';

@Injectable({
    providedIn: 'root'
})
export class TestService {
    getData(): Observable<ServerResponse> {
        const allOperations = forkJoin(
            this.getClientData(),
            this.getOtherData()
        );

        const observable = Observable.create(function subscribe(observer) {
            // Wait until all operations have completed
            allOperations.subscribe(([clientData, otherData]) => {
                const data = new ServerResponse;
                // Update your ServerReponse with client and other data
                data.otherdata = otherData.other;
                data.client = clientData.client;
                // Now that data is 100% populated, emit to anything subscribed to getData().
                observer.next(data);
                observer.complete();
            });

        });
        // We return the observable, with the code above to be executed only once it is subscribed to
        return observable;
    }

    getClientData() : Observable<any> {
        return of({ client: 'Client 1' });
    }
    getOtherData(): Observable<any> {
        // Fake server latency
        return of({ other: 'Other data that takes a while to return from server...' })
            .pipe(delay(2000));
    }
}

export class ServerResponse {
    client: string;
    otherdata: string;
}

If you call getData() and subscribe to the observable, you will see that forkJoin works as intended and we have to wait 2 seconds for both child observables to complete and our observable to emit a value:

this.testService.getData().subscribe(data => {
  console.log(data);
});

It seems that you might be new to RxJS / asynchronous programming. I suggest reading the excellent introduction to RxJs when you get a chance. It can be tricky at first but with practice this will come to be second nature.

like image 167
Dean Avatar answered Nov 05 '22 06:11

Dean