Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Shared python generator

I am trying to reproduce the reactive extensions "shared" observable concept with Python generators.

Say I have an API that gives me an infinite stream that I can use like this:

def my_generator():
    for elem in the_infinite_stream():
        yield elem

I could use this generator multiple times like so:

stream1 = my_generator()
stream2 = my_generator()

And the_infinite_stream() will be called twice (once for each generator).

Now say that the_infinite_stream() is an expensive operation. Is there a way to "share" the generator between multiple clients? It seems like tee would do that, but I have to know in advance how many independent generators I want.

The idea is that in other languages (Java, Swift) using the reactive extensions (RxJava, RxSwift) "shared" streams, I can conveniently duplicate the stream on the client side. I am wondering how to do that in Python.

Note: I am using asyncio

like image 591
JonasVautherin Avatar asked Apr 14 '19 10:04

JonasVautherin


People also ask

Are generators thread safe Python?

It's not thread-safe; simultaneous calls may interleave, and mess with the local variables. The common approach is to use the master-slave pattern (now called farmer-worker pattern in PC).

Are Python generators efficient?

Generators in python provide an efficient way of generating numbers or objects as and when needed, without having to store all the values in memory beforehand.

What is generator in Python with example?

Python generators are a simple way of creating iterators. All the work we mentioned above are automatically handled by generators in Python. Simply speaking, a generator is a function that returns an object (iterator) which we can iterate over (one value at a time).


1 Answers

I took tee implementation and modified it such you can have various number of generators from infinite_stream:

import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1

To cache only some amount of values you can change already_gone = [] into already_gone = collections.deque(maxlen=size) and add size=None parameter to generators_factory.

like image 88
sanyassh Avatar answered Sep 18 '22 17:09

sanyassh