What is the proper way to wait until all the observers on_completed are called if the observers are using observe_on(rxcpp::observe_on_new_thread()):
For example:
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
.subscribe([&](int) { slow_function(foo); }));
auto lifetime = rxcpp::composite_subscription();
lifetime.add([&](){ wrapper.log("unsubscribe"); });
auto s2 = values.ref_count().as_blocking().subscribe(lifetime);
// hope to call something here to wait for the completion of
// s1's on_completed function
}
// the program usually crashes here when foo goes out of scope because
// the slow_function(foo) is still working on foo. I also noticed that
// s1's on_completed never got called.
My question is how to wait until s1's on_completed is finished without having to set and poll some variables.
The motivation of using observe_on() is because there are usually multiple observers on values, and I would like each observer to run concurrently. Perhaps there are different ways to achieve the same goal, I am open to all your suggestions.
Merging the two will allow a single blocking subscribe to wait for both to finish.
{
Foo foo;
auto generator = [&](rxcpp::subscriber<int> s)
{
s.on_next(1);
s.on_next(2);
// ...
s.on_completed();
};
auto values = rxcpp::observable<>::create<int>(generator).publish();
auto work = values.
observe_on(rxcpp::observe_on_new_thread()).
tap([&](int c) {
slow_function(foo);
}).
finally([](){printf("s1 completed\n");}).
as_dynamic();
auto start = values.
ref_count().
finally([](){printf("s2 completed\n");}).
as_dynamic();
// wait for all to finish
rxcpp::observable<>::from(work, start).
merge(rxcpp::observe_on_new_thread()).
as_blocking().subscribe();
}
A few points.
the stream must return the same type for merge to work. if combining streams of different types, use combine_latest instead.
the order of the observables in observable<>::from() is important, the start stream has ref_count, so it must be called last so that the following merge will have subscribed to the work before starting the generator.
The merge has two threads calling it. This requires that a thread-safe coordination be used. rxcpp is pay-for-use. by default the operators assume that all the calls are from the same thread. any operator that gets calls from multiple threads needs to be given a thread-safe coordination which the operator uses to impose thread-safe state management and output calls.
If desired the same coordinator instance could be used for both.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With