TL;DR I'm looking for help to implement the marble diagram below. The intention is to sort the non-sorted values to the extent possible without waiting time between scan executions.
I'm not asking for a full implementation. Any guidance will be welcome. I have an asynchronous slow (forced for testing purpose) scan of a infinite hot observable. This is the relevant code:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
This is the full version: https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
And this is the current output (values are random generated):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
I would like to sort the pending messages between scan executions. So the first emitted message will be always the first consumed one, but the next consumed message would be the min(value) of the emitted and non-consumed messages until that point (all of them in the current version because instant emitting). And so on an so forth... I think the marble diagram is better than my explanations.
Note that the scan is not waiting for the complete event, the only reason it doesn't start after the last message is emitted is because the sleep. Here you have another version in which the sleep has been removed from scan and put in ExternalDummyService. You can see that the values are consumed in the moment they are emitted. This is also shown in the marble diagram.
I tried with to_sorted_list, the only sorting method I found in RxPy, but I couldn't make it work.
What I'm looking for is something like this:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
Thanks
If you want to use to_sorted_list
you need to remap the list you obtain in a single element. Changing your main
function to:
def main():
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.to_sorted_list(key_selector=lambda msg: msg.timestamp) \
.flat_map(lambda msglist: Observable.from_iterable(msglist)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
gives:
>emitting Msg(count=0, timestamp=18.924474716186523)
>emitting Msg(count=1, timestamp=4.669189453125)
>emitting Msg(count=2, timestamp=18.633127212524414)
>emitting Msg(count=3, timestamp=15.151262283325195)
>emitting Msg(count=4, timestamp=14.705896377563477)
>SLOW st.count=0 last_msg.counter=1 ts=4.67
>SLOW st.count=1 last_msg.counter=4 ts=14.71
>SLOW st.count=2 last_msg.counter=3 ts=15.15
>SLOW st.count=3 last_msg.counter=2 ts=18.63
>SLOW st.count=4 last_msg.counter=0 ts=18.92
>SLOW FINISHED
Note that the to_sorted_list
method will wait for the end of the subject stream to start scanning, so you cannot use it to implement your marble diagram as it shown in the question.
To properly implement it, I think you would need something like onBackpressureBuffer
which is implemented in RxJava but not in RxPy.
This wouldn't solve entirely the issue as the buffer is FIFO (first in first out) and you want a custom way of chosing which message goes out first. This may require a tweak on how the request to the buffer are handled.
You may find a better way to progress towards a solution with a extension of RxPy called rxbackpressure, particularly with its class dequeuablebuffer.py that you may be able to adapt to suit your needs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With