Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create an Observable you can unsubscribe from in RxCpp

I'm porting some code from C# that heavily relies on Rx, and I have difficulties finding C++ equivalents to some of the most used C# methods.

In particular, I want to create an observable from the subscription/unsubscription logic. In C#, I use the Observable.Create<TSource> Method (Func<IObserver<TSource>, Action>) override to create an observable. For instance

var observable = Observable.Create<int>(observer =>
{
    observers.Add(observer);
    return () =>
    {
        observers.Remove(observer)
    };
});

Is it possible to do the same with RxCpp? I think the answer lies in the rx::observable<>::create(OnSubscribe os) method, but I can't figure out how to use it to "register" an unsubscription lambda.

like image 335
Falanwe Avatar asked Dec 04 '25 10:12

Falanwe


1 Answers

In RxCpp and RxJava .subscribe() takes a subscriber. A subscriber is a subscription and an observer bound together.

In RxCpp your example might look like this:

std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers(new std::list<rxcpp::subscriber<int>>());

auto observable = rxcpp::observable<>::create<int>([=](rxcpp::subscriber<int> out){
    auto it = observers->insert(observers->end(), out);
    it->add([=](){
        observers->erase(it);
    });
});

NOTE: rxcpp::subscriber<int> is a type-forgetter that hides the type of the observer. This allows it to be stored in a collection, but introduces virtual functions for on_next, on_error and on_completed.

like image 52
Kirk Shoop Avatar answered Dec 06 '25 23:12

Kirk Shoop



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!