I'm trying to get my head around the rxpy
library for functional reactive programming (FRP) and I've already hit a roadblock. I'm writing a small program that expects data to be streamed in via standard input (sys.stdin
).
My question is therefore simple: how can I create an rx.Observable
instance that will asynchronously read from stdin? Are there built-in mechanisms to create Observable
instances from streams?
I've never used RxPy
, but I have a bit of familiarity with RxJS
.
RxPy
has a number of built-in methods that you could likely use for this purpose, but I'm inclined to create an Observable factory. Taking ObservableCreation.from_array
as our guide, let's try that now. (Note: I haven't run this code, but it should get you most of the way there)
from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler
class ObservableFile(Observable, metaclass=ObservableMeta):
@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)
except StopIteration: # EOF
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)
Then just use it like this:
res = rx.Observable.from_file(sys.stdin)
This will create an observable over each line of stdin until EOF. It's blocking, but there are ways around that. It can also be tuned with a different scheduler.
I've just been playing with this today and
d = rx.Observable.from_(sys.stdin).subscribe(print)
appears to work (echos lines to stdout). from_
is an alias for from_iterable
.
d
is a Disposable to unsubscribe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With