Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to subscribe exactly once to an element from AsyncSubject (consumer pattern)

In rxjs5, I have an AsyncSubject and want to subscribe to it multiple times, but only ONE subscriber should ever receive the next() event. All others (if they are not yet unsubscribed) should immediately get the complete() event without next().

Example:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);
like image 916
Dynalon Avatar asked Oct 27 '16 07:10

Dynalon


1 Answers

You can easily implement this by writing a small class that wraps the initial AsyncSubject

import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'

class SingleSubscriberObservable<T> {
    private newSubscriberSubscribed = new Subject();

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        this.newSubscriberSubscribed.next();
        return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
    }
}

You can then try it out in your example:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)

let fired = false;

function setFired(label:string){
    return ()=>{
        if(fired == true) throw new Error("Multiple subscriptions executed");
        console.log("FIRED", label);
        fired = true;
    }
}

function logDone(label: string){
    return ()=>{
       console.log(`${label} Will stop subscribing to source observable`);
    }
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
    as.next(undefined);
    as.complete();
}, 500)

The key here is this part:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
    this.newSubscriberSusbscribed.next();
    return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

Every time someone calls subscribe we will signal the newSubscriberSubscribed subject.

When we subscribe to the underlying Observable we use

takeUntil(this.newSubscriberSubscribed)

This means that when the next subscriber calls:

this.newSubscriberSubscribed.next()

The previously returned observable will complete.

So this will result in what you are asking which is that the previous subscription complete whenever a new subscription comes along.

The output of the application would be:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

EDIT:

If you want to do it where the first that subscribed stays subscribed and all future subscriptions receive immediate complete (so that while the earliest subscriber is still subscribed nobody else can subscribe). You can do it like this:

class SingleSubscriberObservable<T> {
    private isSubscribed: boolean = false;

    constructor(private sourceObservable: Observable<T>) {}

    subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
        if(this.isSubscribed){
            return Observable.empty().subscribe(next, error, complete);    
        }
        this.isSubscribed = true;
        var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
        return new Subscription(()=>{
            unsubscribe.unsubscribe();
            this.isSubscribed = false;
        });
    }
}

We keep a flag this.isSusbscribed to keep track of whether there is someone currently subscribed. We also return a custom subscription that we can use to set this flag back to false when things unsubscribe.

Whenever someone tries to subscribe, if we instead susbscribe them to an empty Observable which would complete immediately. The output would look like:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable
like image 87
Daniel Tabuenca Avatar answered Oct 20 '22 20:10

Daniel Tabuenca