Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Observable from function?

I want to call a function (synchronously) and then use its return value as an initial emission (subsequently chaining some other operators on the resulting observable).

I want to invoke this function during subscription, so I can't just use Observable.of(() => getSomeValue()). I've seen bindCallback (previously fromCallback) but I don't think it can be used for this task (correct me if I'm wrong). I've seen start static operator in v4 docs but apparently it is not implemented in v5 (and no indication that its on the way). RxJava also has fromCallable operator that does exactly that afaik.

Only way I could think of is like this:

Observable.create((observer: Observer<void>) => {
  let val = getSomeValue();
  observer.next(val);
  observer.complete();
})

which I think does just that. But this just seems so complicated for a simple thing that should probably have been like Observable.fromFunction(() => getSomeValue()) And what if I want to run it asynchronously, like start operator does? How can I do this in the current version of RxJS?

like image 549
Titan Avatar asked Jan 02 '17 19:01

Titan


3 Answers

I tend to avoid any explicit use of Observable.create where ever possible, because generally it is a source of bugs to have to manage not just your event emission but also your teardown logic.

You can use Observable.defer instead. It accepts a function that returns an Observable or an Observable-like thing (read: Promise, Array, Iterators). So if you have a function that returns an async thing it is as easy as:

Observable.defer(() => doSomethingAsync());

If you want this to work with a synchronous result then do:

Observable.defer(() => Observable.of(doSomethingSync()));

Note: That like create this will rerun the function on each subscription. This is different then say the result of Observable.bindCallback which stores the function call result without re-executing the function. So if you need that sort of behavior you will need to use the appropriate multicasting operator.

like image 121
paulpdaniels Avatar answered Oct 20 '22 15:10

paulpdaniels


An implementation of a fromFunction$ that I used in my project:

function fromFunction$<T>(factory: () => T): Observable<T> {
    return Observable.create((observer: Subscriber<T>) => {
        try {
            observer.next(factory());
            observer.complete();
        } catch (error) {
            observer.error(error);
        }
    });
}

Used like:

fromFunction$(() => 0).subscribe((value) => console.log(`Value is '${value}'`), null, () => console.log('Completed'));
fromFunction$(() => [1, 2, 3]).subscribe((value) => console.log(`Value is '${value}'`), null, () => console.log('Completed'));
fromFunction$(() => { throw 'Something' }).subscribe(null, (error) => console.error(`Error: ${error}`));

Gives:

Value is '0'
Completed

Value is '1,2,3'
Completed

Error: Something

Until such implementation exists.

like image 23
Matt Avatar answered Oct 20 '22 15:10

Matt


Actually I think the best option is using Observable.create because it's the most universal solution for both synchronous and asynchronous initial values.

If you're sure you'll use a synchronous function you can use startWith() operator (this makes sence only if return value from getSomeValue() should be the same for all Observers).

Using Observable.bindCallback as a source Observable is of course doable however I personally recommend to avoid it because it makes your code very hard to understand and it's usually not necessary because you can use just Observable.create.

like image 3
martin Avatar answered Oct 20 '22 15:10

martin