I'm trying to figure out the scheduling model in the C++ version of Rx.
Knowing the C# version where there is a simple interface with one Schedule method; The C++ version seems rather complex, with stuff like scheduler, worker, and coordination.
One major missing piece for me is an implementation of a thread pool scheduler, does it exist with some other name? How would I implement it my self? Should I write it above PPL (Windows)? If I need a serialized (Actor like) observer above it, what should I use? Peeking here and here can show this is not a trivial task.
It would really help getting some kind of overview about the subject, since the official documentation is auto generated and still really sparse.
Yes, the generated docs are new and the scheduling is not yet documented.
The scheduler in rxcpp v2 is based on the scheduler and worker constructs that RxJava uses (Eric Meijer was involved) The docs for RxJava will have an explanation for scheduler and worker. rxcpp adds schedulable, coordination and coordinator.
scheduler
owns a timeline that is exposed by the now()
method. scheduler
is also a factory for worker
s in that timeline. since a scheduler owns a timeline it is possible to build schedulers that time-travel. the virtual-scheduler is a base for the test-scheduler that uses this to make multi-second tests complete in ms.
worker
owns a queue of pending schedulable
s for the timeline and has a lifetime. when the time for an schedulable
is reached the schedulable
is run. The queue maintains insertion order so that when N schedulable
s have the same target time they are run in the order that they were inserted into the queue. The worker
guarantees that each schedulable
completes before the next schedulable
is started. when the worker
's lifetime is unsubscribed all pending schedulable
s are discarded.
schedulable
owns a function and has a worker and a lifetime. when the schedulable
's lifetime is unsubscribed the schedulable
function will not be called. the schedulable
is passed to the function and allows the function to reschedule itself or schedule something else on the same worker.
The new concepts are coordination and coordinator. I added these to simplify operator implementations and to introduce pay-for-use in operator implementations. Specifically, in Rx.NET and RxJava, the operators use atomic operations and synchronization primitives to coordinate messages from multiple streams even when all the streams are on the same thread (like UI events). The identity_. . .
coordinations in rxcpp are used by default and have no overhead. The syncronize_. . .
and observe_on_. . .
coordinations use mutex and queue-onto-a-worker respectively, to interleave multiple streams safely.
coordination
is a factory for coordinator
s and has a scheduler
.
coordinator
has a worker
, and is a factory for coordinated observable
s, subscriber
s and schedulable
functions.
All the operators that take multiple streams or deal in time (even subscribe_on and observe_on) take a coordination parameter, not scheduler.
Here are some supplied functions that will produce a coordination using a particular scheduler.
There is no thread-pool scheduler yet. A thread-pool scheduler requires taking a dependency on a thread-pool implementation since I do not wish to write a thread-pool. My plan is to make a scheduler for the windows thread-pool and the apple thread-pool and the boost asio executor pool.. One question to answer is whether these platform specific constructs should live in the rxcpp repo or have platform specific repos.
Contributions, opinions and ideas are welcome!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With