Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Function local name binding from an outer scope

I need a way to "inject" names into a function from an outer code block, so they are accessible locally and they don't need to be specifically handled by the function's code (defined as function parameters, loaded from *args etc.)

The simplified scenario: providing a framework within which the users are able to define (with as little syntax as possible) custom functions to manipulate other objects of the framework (which are not necessarily global).

Ideally, the user defines

def user_func():
    Mouse.eat(Cheese)
    if Cat.find(Mouse):
        Cat.happy += 1

Here Cat, Mouse and Cheese are framework objects that, for good reasons, cannot be bounded to the global namespace.

I want to write a wrapper for this function to behave like this:

def framework_wrap(user_func):
    # this is a framework internal and has name bindings to Cat, Mouse and Cheese
    def f():
        inject(user_func, {'Cat': Cat, 'Mouse': Mouse, 'Cheese': Cheese})
        user_func()
    return f

Then this wrapper could be applied to all user-defined functions (as a decorator, by the user himself or automatically, although I plan to use a metaclass).

@framework_wrap
def user_func():

I am aware of the Python 3's nonlocal keyword, but I still consider ugly (from the framework's user perspective) to add an additional line:

nonlocal Cat, Mouse, Cheese

and to worry about adding every object he needs to this line.

Any suggestion is greatly appreciated.

like image 704
Lucian Boca Avatar asked Oct 11 '10 17:10

Lucian Boca


2 Answers

The more I mess around with the stack, the more I wish I hadn't. Don't hack globals to do what you want. Hack bytecode instead. There's two ways that I can think of to do this.

1) Add cells wrapping the references that you want into f.func_closure. You have to reassemble the bytecode of the function to use LOAD_DEREF instead of LOAD_GLOBAL and generate a cell for each value. You then pass a tuple of the cells and the new code object to types.FunctionType and get a function with the appropriate bindings. Different copies of the function can have different local bindings so it should be as thread safe as you want to make it.

2) Add arguments for your new locals at the end of the functions argument list. Replace appropriate occurrences of LOAD_GLOBAL with LOAD_FAST. Then construct a new function by using types.FunctionType and passing in the new code object and a tuple of the bindings that you want as the default option. This is limited in the sense that python limits function arguments to 255 and it can't be used on functions that use variable arguments. None the less it struck me as the more challenging of the two so that's the one that I implemented (plus there's other stuff that can be done with this one). Again, you can either make different copies of the function with different bindings or call the function with the bindings that you want from each call location. So it too can be as thread safe as you want to make it.

import types
import opcode

# Opcode constants used for comparison and replacecment
LOAD_FAST = opcode.opmap['LOAD_FAST']
LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
STORE_FAST = opcode.opmap['STORE_FAST']

DEBUGGING = True

def append_arguments(code_obj, new_locals):
    co_varnames = code_obj.co_varnames   # Old locals
    co_names = code_obj.co_names      # Old globals
    co_argcount = code_obj.co_argcount     # Argument count
    co_code = code_obj.co_code         # The actual bytecode as a string

    # Make one pass over the bytecode to identify names that should be
    # left in code_obj.co_names.
    not_removed = set(opcode.hasname) - set([LOAD_GLOBAL])
    saved_names = set()
    for inst in instructions(co_code):
        if inst[0] in not_removed:
            saved_names.add(co_names[inst[1]])

    # Build co_names for the new code object. This should consist of 
    # globals that were only accessed via LOAD_GLOBAL
    names = tuple(name for name in co_names
                  if name not in set(new_locals) - saved_names)

    # Build a dictionary that maps the indices of the entries in co_names
    # to their entry in the new co_names
    name_translations = dict((co_names.index(name), i)
                             for i, name in enumerate(names))

    # Build co_varnames for the new code object. This should consist of
    # the entirety of co_varnames with new_locals spliced in after the
    # arguments
    new_locals_len = len(new_locals)
    varnames = (co_varnames[:co_argcount] + new_locals +
                co_varnames[co_argcount:])

    # Build the dictionary that maps indices of entries in the old co_varnames
    # to their indices in the new co_varnames
    range1, range2 = xrange(co_argcount), xrange(co_argcount, len(co_varnames))
    varname_translations = dict((i, i) for i in range1)
    varname_translations.update((i, i + new_locals_len) for i in range2)

    # Build the dictionary that maps indices of deleted entries of co_names
    # to their indices in the new co_varnames
    names_to_varnames = dict((co_names.index(name), varnames.index(name))
                             for name in new_locals)

    if DEBUGGING:
        print "injecting: {0}".format(new_locals)
        print "names: {0} -> {1}".format(co_names, names)
        print "varnames: {0} -> {1}".format(co_varnames, varnames)
        print "names_to_varnames: {0}".format(names_to_varnames)
        print "varname_translations: {0}".format(varname_translations)
        print "name_translations: {0}".format(name_translations)


    # Now we modify the actual bytecode
    modified = []
    for inst in instructions(code_obj.co_code):
        # If the instruction is a LOAD_GLOBAL, we have to check to see if
        # it's one of the globals that we are replacing. Either way,
        # update its arg using the appropriate dict.
        if inst[0] == LOAD_GLOBAL:
            print "LOAD_GLOBAL: {0}".format(inst[1])
            if inst[1] in names_to_varnames:
                print "replacing with {0}: ".format(names_to_varnames[inst[1]])
                inst[0] = LOAD_FAST
                inst[1] = names_to_varnames[inst[1]]
            elif inst[1] in name_translations:    
                inst[1] = name_translations[inst[1]]
            else:
                raise ValueError("a name was lost in translation")
        # If it accesses co_varnames or co_names then update its argument.
        elif inst[0] in opcode.haslocal:
            inst[1] = varname_translations[inst[1]]
        elif inst[0] in opcode.hasname:
            inst[1] = name_translations[inst[1]]
        modified.extend(write_instruction(inst))

    code = ''.join(modified)
    # Done modifying codestring - make the code object

    return types.CodeType(co_argcount + new_locals_len,
                          code_obj.co_nlocals + new_locals_len,
                          code_obj.co_stacksize,
                          code_obj.co_flags,
                          code,
                          code_obj.co_consts,
                          names,
                          varnames,
                          code_obj.co_filename,
                          code_obj.co_name,
                          code_obj.co_firstlineno,
                          code_obj.co_lnotab)


def instructions(code):
    code = map(ord, code)
    i, L = 0, len(code)
    extended_arg = 0
    while i < L:
        op = code[i]
        i+= 1
        if op < opcode.HAVE_ARGUMENT:
            yield [op, None]
            continue
        oparg = code[i] + (code[i+1] << 8) + extended_arg
        extended_arg = 0
        i += 2
        if op == opcode.EXTENDED_ARG:
            extended_arg = oparg << 16
            continue
        yield [op, oparg]

def write_instruction(inst):
    op, oparg = inst
    if oparg is None:
        return [chr(op)]
    elif oparg <= 65536L:
        return [chr(op), chr(oparg & 255), chr((oparg >> 8) & 255)]
    elif oparg <= 4294967296L:
        return [chr(opcode.EXTENDED_ARG),
                chr((oparg >> 16) & 255),
                chr((oparg >> 24) & 255),
                chr(op),
                chr(oparg & 255),
                chr((oparg >> 8) & 255)]
    else:
        raise ValueError("Invalid oparg: {0} is too large".format(oparg))



if __name__=='__main__':
    import dis

    class Foo(object):
        y = 1

    z = 1
    def test(x):
        foo = Foo()
        foo.y = 1
        foo = x + y + z + foo.y
        print foo

    code_obj = append_arguments(test.func_code, ('y',))
    f = types.FunctionType(code_obj, test.func_globals, argdefs=(1,))
    if DEBUGGING:
        dis.dis(test)
        print '-'*20
        dis.dis(f)
    f(1)

Note that a whole branch of this code (that relating to EXTENDED_ARG) is untested but that for common cases, it seems to be pretty solid. I'll be hacking on it and am currently writing some code to validate the output. Then (when I get around to it) I'll run it against the whole standard library and fix any bugs.

I'll also probably be implementing the first option as well.

like image 84
aaronasterling Avatar answered Oct 21 '22 20:10

aaronasterling


Edited answer -- restores namespace dict after calling user_func()

Tested using Python 2.7.5 and 3.3.2

File framework.py:

# framework objects
class Cat: pass
class Mouse: pass
class Cheese: pass

_namespace = {'Cat':Cat, 'Mouse':Mouse, 'Cheese':Cheese } # names to be injected

# framework decorator
from functools import wraps
def wrap(f):
    func_globals = f.func_globals if hasattr(f,'func_globals') else f.__globals__
    @wraps(f)
    def wrapped(*args, **kwargs):
        # determine which names in framework's _namespace collide and don't
        preexistent = set(name for name in _namespace if name in func_globals)
        nonexistent = set(name for name in _namespace if name not in preexistent)
        # save any preexistent name's values
        f.globals_save = {name: func_globals[name] for name in preexistent}
        # temporarily inject framework's _namespace
        func_globals.update(_namespace)

        retval = f(*args, **kwargs) # call function and save return value

        # clean up function's namespace
        for name in nonexistent:
             del func_globals[name] # remove those that didn't exist
        # restore the values of any names that collided
        func_globals.update(f.globals_save)
        return retval

    return wrapped

Example usage:

from __future__ import print_function
import framework

class Cat: pass  # name that collides with framework object

@framework.wrap
def user_func():
    print('in user_func():')
    print('  Cat:', Cat)
    print('  Mouse:', Mouse)
    print('  Cheese:', Cheese)

user_func()

print()
print('after user_func():')
for name in framework._namespace:
    if name in globals():
        print('  {} restored to {}'.format(name, globals()[name]))
    else:
        print('  {} not restored, does not exist'.format(name))

Output:

in user_func():
  Cat: <class 'framework.Cat'>
  Mouse: <class 'framework.Mouse'>
  Cheese: <class 'framework.Cheese'>

after user_func():
  Cheese not restored, does not exist
  Mouse not restored, does not exist
  Cat restored to <class '__main__.Cat'>
like image 34
martineau Avatar answered Oct 21 '22 19:10

martineau