Using RxPY for illustration purposes.
I want to create an observable from a function, but that function must take parameters. This particular example must return, at random intervals, one of many pre-defined tickers which I want to send to it. My solution thus far is to use a closure:
from __future__ import print_function
from rx import Observable
import random
import string
import time
def make_tickers(n = 300, s = 123):
""" generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def spawn_prices_fn(tickers):
""" returns a function that will return a random element
out of tickers every 20-100 ms, and takes an observable parameter """
def spawner(observer):
while True:
next_tick = random.choice(tickers)
observer.on_next(next_tick)
time.sleep(random.randint(20, 100)/1000.0)
return(spawner)
if __name__ == "__main__":
spawned = spawn_prices_fn(make_tickers())
xx = Observable.create(spawned)
xx.subscribe(lambda s: print(s))
Is there a simpler way? Can further parameters be sent to Observable.create's first paramater function, that don't require a closure? What is the canonical advice?
It can be done in numerous ways, here's one of the solutions that doesn't change your code too much.
Note that tickers generation could also be broken up into a function generating a single string combined with some rx
magic to be more rx
-like
I also slightly adjusted the code to make flake8
happy
from __future__ import print_function
import random
import string
import time
from rx import Observable
def make_tickers(n=300, s=123):
"""
Generates up to n unique 3-letter strings each made up of uppercase letters
"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3))
for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
xx = Observable\
.repeat(make_tickers())\
.map(random_picker)\
.subscribe(lambda s: print(s))
or a solution without make_tickers
:
from __future__ import print_function
import random
import string
import time
from rx import Observable
def random_picker(tickers):
ticker = random.choice(tickers)
time.sleep(random.randint(20, 100) / 1000.0)
return ticker
if __name__ == "__main__":
random.seed(123)
Observable.range(1, 300)\
.map(lambda _: ''.join(random.choice(string.ascii_uppercase)
for _ in range(3)))\
.reduce(lambda x, y: x + [y], [])\
.do_while(lambda _: True)\
.map(random_picker)\
.subscribe(lambda s: print(s))
time.sleep
could be moved away from random_picker
but the code would become a bit trickier
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With