Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Turn functions with a callback into Python generators?

The Scipy minimization function (just to use as an example), has the option of adding a callback function at each step. So I can do something like,

def my_callback(x):     print x scipy.optimize.fmin(func, x0, callback=my_callback) 

Is there a way to use the callback function to create a generator version of fmin, so that I could do,

for x in my_fmin(func,x0):     print x 

It seems like it might be possible with some combination of yields and sends, but I can quite think of anything.

like image 900
marius Avatar asked Apr 01 '12 21:04

marius


People also ask

How do I call a callback function in Python?

In the Parallel Python Module, the submit function is known as the callback function. The callback function acts as an argument for any other function. The other function in which the callback function is an argument calls the callback function in its function definition.

How do you create a function generator in Python?

It is fairly simple to create a generator in Python. It is as easy as defining a normal function, but with a yield statement instead of a return statement. If a function contains at least one yield statement (it may contain other yield or return statements), it becomes a generator function.

How do you yield a generator in Python?

You can assign this generator to a variable in order to use it. When you call special methods on the generator, such as next() , the code within the function is executed up to yield . When the Python yield statement is hit, the program suspends function execution and returns the yielded value to the caller.

Can you return in a generator function Python?

Python provides a generator to create your own iterator function. A generator is a special type of function which does not return a single value, instead, it returns an iterator object with a sequence of values. In a generator function, a yield statement is used rather than a return statement.


2 Answers

Generator as coroutine (no threading)

Let's have FakeFtp with retrbinary function using callback being called with each successful read of chunk of data:

class FakeFtp(object):     def __init__(self):         self.data = iter(["aaa", "bbb", "ccc", "ddd"])      def login(self, user, password):         self.user = user         self.password = password      def retrbinary(self, cmd, cb):         for chunk in self.data:             cb(chunk) 

Using simple callback function has disadvantage, that it is called repeatedly and the callback function cannot easily keep context between calls.

Following code defines process_chunks generator, which will be able receiving chunks of data one by one and processing them. In contrast to simple callback, here we are able to keep all the processing within one function without losing context.

from contextlib import closing from itertools import count   def main():     processed = []      def process_chunks():         for i in count():             try:                 # (repeatedly) get the chunk to process                 chunk = yield             except GeneratorExit:                 # finish_up                 print("Finishing up.")                 return             else:                 # Here process the chunk as you like                 print("inside coroutine, processing chunk:", i, chunk)                 product = "processed({i}): {chunk}".format(i=i, chunk=chunk)                 processed.append(product)      with closing(process_chunks()) as coroutine:         # Get the coroutine to the first yield         coroutine.next()         ftp = FakeFtp()         # next line repeatedly calls `coroutine.send(data)`         ftp.retrbinary("RETR binary", cb=coroutine.send)         # each callback "jumps" to `yield` line in `process_chunks`      print("processed result", processed)     print("DONE") 

To see the code in action, put the FakeFtp class, the code shown above and following line:

main() 

into one file and call it:

$ python headsandtails.py ('inside coroutine, processing chunk:', 0, 'aaa') ('inside coroutine, processing chunk:', 1, 'bbb') ('inside coroutine, processing chunk:', 2, 'ccc') ('inside coroutine, processing chunk:', 3, 'ddd') Finishing up. ('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd']) DONE 

How it works

processed = [] is here just to show, the generator process_chunks shall have no problems to cooperate with its external context. All is wrapped into def main(): to prove, there is no need to use global variables.

def process_chunks() is the core of the solution. It might have one shot input parameters (not used here), but main point, where it receives input is each yield line returning what anyone sends via .send(data) into instance of this generator. One can coroutine.send(chunk) but in this example it is done via callback refering to this function callback.send.

Note, that in real solution there is no problem to have multiple yields in the code, they are processed one by one. This might be used e.g. to read (and ignore) header of CSV file and then continue processing records with data.

We could instantiate and use the generator as follows:

coroutine = process_chunks() # Get the coroutine to the first yield coroutine.next()  ftp = FakeFtp() # next line repeatedly calls `coroutine.send(data)` ftp.retrbinary("RETR binary", cb=coroutine.send) # each callback "jumps" to `yield` line in `process_chunks`  # close the coroutine (will throw the `GeneratorExit` exception into the # `process_chunks` coroutine). coroutine.close() 

Real code is using contextlib closing context manager to ensure, the coroutine.close() is always called.

Conclusions

This solution is not providing sort of iterator to consume data from in traditional style "from outside". On the other hand, we are able to:

  • use the generator "from inside"
  • keep all iterative processing within one function without being interrupted between callbacks
  • optionally use external context
  • provide usable results to outside
  • all this can be done without using threading

Credits: The solution is heavily inspired by SO answer Python FTP “chunk” iterator (without loading entire file into memory) written by user2357112

like image 23
Jan Vlcinsky Avatar answered Sep 19 '22 20:09

Jan Vlcinsky


As pointed in the comments, you could do it in a new thread, using Queue. The drawback is that you'd still need some way to access the final result (what fmin returns at the end). My example below uses an optional callback to do something with it (another option would be to just yield it also, though your calling code would have to differentiate between iteration results and final results):

from thread import start_new_thread from Queue import Queue  def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):      q = Queue() # fmin produces, the generator consumes     job_done = object() # signals the processing is done      # Producer     def my_callback(x):         q.put(x)     def task():         ret = scipy.optimize.fmin(func,x0,callback=my_callback)         q.put(job_done)         end_callback(ret) # "Returns" the result of the main call      # Starts fmin in a new thread     start_new_thread(task,())      # Consumer     while True:         next_item = q.get(True,timeout) # Blocks until an input is available         if next_item is job_done:             break         yield next_item 

Update: to block the execution of the next iteration until the consumer has finished processing the last one, it's also necessary to use task_done and join.

    # Producer     def my_callback(x):         q.put(x)         q.join() # Blocks until task_done is called      # Consumer     while True:         next_item = q.get(True,timeout) # Blocks until an input is available         if next_item is job_done:             break         yield next_item         q.task_done() # Unblocks the producer, so a new iteration can start 

Note that maxsize=1 is not necessary, since no new item will be added to the queue until the last one is consumed.

Update 2: Also note that, unless all items are eventually retrieved by this generator, the created thread will deadlock (it will block forever and its resources will never be released). The producer is waiting on the queue, and since it stores a reference to that queue, it will never be reclaimed by the gc even if the consumer is. The queue will then become unreachable, so nobody will be able to release the lock.

A clean solution for that is unknown, if possible at all (since it would depend on the particular function used in the place of fmin). A workaround could be made using timeout, having the producer raises an exception if put blocks for too long:

    q = Queue(maxsize=1)      # Producer     def my_callback(x):         q.put(x)         q.put("dummy",True,timeout) # Blocks until the first result is retrieved         q.join() # Blocks again until task_done is called      # Consumer     while True:         next_item = q.get(True,timeout) # Blocks until an input is available         q.task_done()                   # (one "task_done" per "get")         if next_item is job_done:             break         yield next_item         q.get() # Retrieves the "dummy" object (must be after yield)         q.task_done() # Unblocks the producer, so a new iteration can start 
like image 81
mgibsonbr Avatar answered Sep 20 '22 20:09

mgibsonbr