I am trying to reproduce the reactive extensions "shared" observable concept with Python generators.
Say I have an API that gives me an infinite stream that I can use like this:
def my_generator():
for elem in the_infinite_stream():
yield elem
I could use this generator multiple times like so:
stream1 = my_generator()
stream2 = my_generator()
And the_infinite_stream()
will be called twice (once for each generator).
Now say that the_infinite_stream()
is an expensive operation. Is there a way to "share" the generator between multiple clients? It seems like tee would do that, but I have to know in advance how many independent generators I want.
The idea is that in other languages (Java, Swift) using the reactive extensions (RxJava, RxSwift) "shared" streams, I can conveniently duplicate the stream on the client side. I am wondering how to do that in Python.
Note: I am using asyncio
It's not thread-safe; simultaneous calls may interleave, and mess with the local variables. The common approach is to use the master-slave pattern (now called farmer-worker pattern in PC).
Generators in python provide an efficient way of generating numbers or objects as and when needed, without having to store all the values in memory beforehand.
Python generators are a simple way of creating iterators. All the work we mentioned above are automatically handled by generators in Python. Simply speaking, a generator is a function that returns an object (iterator) which we can iterate over (one value at a time).
I took tee
implementation and modified it such you can have various number of generators from infinite_stream
:
import collections
def generators_factory(iterable):
it = iter(iterable)
deques = []
already_gone = []
def new_generator():
new_deque = collections.deque()
new_deque.extend(already_gone)
deques.append(new_deque)
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
newval = next(it) # fetch a new value and
already_gone.append(newval)
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return gen(new_deque)
return new_generator
# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1
To cache only some amount of values you can change already_gone = []
into already_gone = collections.deque(maxlen=size)
and add size=None
parameter to generators_factory
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With