I need a way to "inject" names into a function from an outer code block, so they are accessible locally and they don't need to be specifically handled by the function's code (defined as function parameters, loaded from *args
etc.)
The simplified scenario: providing a framework within which the users are able to define (with as little syntax as possible) custom functions to manipulate other objects of the framework (which are not necessarily global
).
Ideally, the user defines
def user_func():
Mouse.eat(Cheese)
if Cat.find(Mouse):
Cat.happy += 1
Here Cat
, Mouse
and Cheese
are framework objects that, for good reasons, cannot be bounded to the global namespace.
I want to write a wrapper for this function to behave like this:
def framework_wrap(user_func):
# this is a framework internal and has name bindings to Cat, Mouse and Cheese
def f():
inject(user_func, {'Cat': Cat, 'Mouse': Mouse, 'Cheese': Cheese})
user_func()
return f
Then this wrapper could be applied to all user-defined functions (as a decorator, by the user himself or automatically, although I plan to use a metaclass).
@framework_wrap
def user_func():
I am aware of the Python 3's nonlocal
keyword, but I still consider ugly (from the framework's user perspective) to add an additional line:
nonlocal Cat, Mouse, Cheese
and to worry about adding every object he needs to this line.
Any suggestion is greatly appreciated.
The more I mess around with the stack, the more I wish I hadn't. Don't hack globals to do what you want. Hack bytecode instead. There's two ways that I can think of to do this.
1) Add cells wrapping the references that you want into f.func_closure
. You have to reassemble the bytecode of the function to use LOAD_DEREF
instead of LOAD_GLOBAL
and generate a cell for each value. You then pass a tuple of the cells and the new code object to types.FunctionType
and get a function with the appropriate bindings. Different copies of the function can have different local bindings so it should be as thread safe as you want to make it.
2) Add arguments for your new locals at the end of the functions argument list. Replace appropriate occurrences of LOAD_GLOBAL
with LOAD_FAST
. Then construct a new function by using types.FunctionType
and passing in the new code object and a tuple of the bindings that you want as the default option. This is limited in the sense that python limits function arguments to 255 and it can't be used on functions that use variable arguments. None the less it struck me as the more challenging of the two so that's the one that I implemented (plus there's other stuff that can be done with this one). Again, you can either make different copies of the function with different bindings or call the function with the bindings that you want from each call location. So it too can be as thread safe as you want to make it.
import types
import opcode
# Opcode constants used for comparison and replacecment
LOAD_FAST = opcode.opmap['LOAD_FAST']
LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
STORE_FAST = opcode.opmap['STORE_FAST']
DEBUGGING = True
def append_arguments(code_obj, new_locals):
co_varnames = code_obj.co_varnames # Old locals
co_names = code_obj.co_names # Old globals
co_argcount = code_obj.co_argcount # Argument count
co_code = code_obj.co_code # The actual bytecode as a string
# Make one pass over the bytecode to identify names that should be
# left in code_obj.co_names.
not_removed = set(opcode.hasname) - set([LOAD_GLOBAL])
saved_names = set()
for inst in instructions(co_code):
if inst[0] in not_removed:
saved_names.add(co_names[inst[1]])
# Build co_names for the new code object. This should consist of
# globals that were only accessed via LOAD_GLOBAL
names = tuple(name for name in co_names
if name not in set(new_locals) - saved_names)
# Build a dictionary that maps the indices of the entries in co_names
# to their entry in the new co_names
name_translations = dict((co_names.index(name), i)
for i, name in enumerate(names))
# Build co_varnames for the new code object. This should consist of
# the entirety of co_varnames with new_locals spliced in after the
# arguments
new_locals_len = len(new_locals)
varnames = (co_varnames[:co_argcount] + new_locals +
co_varnames[co_argcount:])
# Build the dictionary that maps indices of entries in the old co_varnames
# to their indices in the new co_varnames
range1, range2 = xrange(co_argcount), xrange(co_argcount, len(co_varnames))
varname_translations = dict((i, i) for i in range1)
varname_translations.update((i, i + new_locals_len) for i in range2)
# Build the dictionary that maps indices of deleted entries of co_names
# to their indices in the new co_varnames
names_to_varnames = dict((co_names.index(name), varnames.index(name))
for name in new_locals)
if DEBUGGING:
print "injecting: {0}".format(new_locals)
print "names: {0} -> {1}".format(co_names, names)
print "varnames: {0} -> {1}".format(co_varnames, varnames)
print "names_to_varnames: {0}".format(names_to_varnames)
print "varname_translations: {0}".format(varname_translations)
print "name_translations: {0}".format(name_translations)
# Now we modify the actual bytecode
modified = []
for inst in instructions(code_obj.co_code):
# If the instruction is a LOAD_GLOBAL, we have to check to see if
# it's one of the globals that we are replacing. Either way,
# update its arg using the appropriate dict.
if inst[0] == LOAD_GLOBAL:
print "LOAD_GLOBAL: {0}".format(inst[1])
if inst[1] in names_to_varnames:
print "replacing with {0}: ".format(names_to_varnames[inst[1]])
inst[0] = LOAD_FAST
inst[1] = names_to_varnames[inst[1]]
elif inst[1] in name_translations:
inst[1] = name_translations[inst[1]]
else:
raise ValueError("a name was lost in translation")
# If it accesses co_varnames or co_names then update its argument.
elif inst[0] in opcode.haslocal:
inst[1] = varname_translations[inst[1]]
elif inst[0] in opcode.hasname:
inst[1] = name_translations[inst[1]]
modified.extend(write_instruction(inst))
code = ''.join(modified)
# Done modifying codestring - make the code object
return types.CodeType(co_argcount + new_locals_len,
code_obj.co_nlocals + new_locals_len,
code_obj.co_stacksize,
code_obj.co_flags,
code,
code_obj.co_consts,
names,
varnames,
code_obj.co_filename,
code_obj.co_name,
code_obj.co_firstlineno,
code_obj.co_lnotab)
def instructions(code):
code = map(ord, code)
i, L = 0, len(code)
extended_arg = 0
while i < L:
op = code[i]
i+= 1
if op < opcode.HAVE_ARGUMENT:
yield [op, None]
continue
oparg = code[i] + (code[i+1] << 8) + extended_arg
extended_arg = 0
i += 2
if op == opcode.EXTENDED_ARG:
extended_arg = oparg << 16
continue
yield [op, oparg]
def write_instruction(inst):
op, oparg = inst
if oparg is None:
return [chr(op)]
elif oparg <= 65536L:
return [chr(op), chr(oparg & 255), chr((oparg >> 8) & 255)]
elif oparg <= 4294967296L:
return [chr(opcode.EXTENDED_ARG),
chr((oparg >> 16) & 255),
chr((oparg >> 24) & 255),
chr(op),
chr(oparg & 255),
chr((oparg >> 8) & 255)]
else:
raise ValueError("Invalid oparg: {0} is too large".format(oparg))
if __name__=='__main__':
import dis
class Foo(object):
y = 1
z = 1
def test(x):
foo = Foo()
foo.y = 1
foo = x + y + z + foo.y
print foo
code_obj = append_arguments(test.func_code, ('y',))
f = types.FunctionType(code_obj, test.func_globals, argdefs=(1,))
if DEBUGGING:
dis.dis(test)
print '-'*20
dis.dis(f)
f(1)
Note that a whole branch of this code (that relating to EXTENDED_ARG
) is untested but that for common cases, it seems to be pretty solid. I'll be hacking on it and am currently writing some code to validate the output. Then (when I get around to it) I'll run it against the whole standard library and fix any bugs.
I'll also probably be implementing the first option as well.
Edited answer -- restores namespace dict after calling user_func()
Tested using Python 2.7.5 and 3.3.2
File framework.py:
# framework objects
class Cat: pass
class Mouse: pass
class Cheese: pass
_namespace = {'Cat':Cat, 'Mouse':Mouse, 'Cheese':Cheese } # names to be injected
# framework decorator
from functools import wraps
def wrap(f):
func_globals = f.func_globals if hasattr(f,'func_globals') else f.__globals__
@wraps(f)
def wrapped(*args, **kwargs):
# determine which names in framework's _namespace collide and don't
preexistent = set(name for name in _namespace if name in func_globals)
nonexistent = set(name for name in _namespace if name not in preexistent)
# save any preexistent name's values
f.globals_save = {name: func_globals[name] for name in preexistent}
# temporarily inject framework's _namespace
func_globals.update(_namespace)
retval = f(*args, **kwargs) # call function and save return value
# clean up function's namespace
for name in nonexistent:
del func_globals[name] # remove those that didn't exist
# restore the values of any names that collided
func_globals.update(f.globals_save)
return retval
return wrapped
Example usage:
from __future__ import print_function
import framework
class Cat: pass # name that collides with framework object
@framework.wrap
def user_func():
print('in user_func():')
print(' Cat:', Cat)
print(' Mouse:', Mouse)
print(' Cheese:', Cheese)
user_func()
print()
print('after user_func():')
for name in framework._namespace:
if name in globals():
print(' {} restored to {}'.format(name, globals()[name]))
else:
print(' {} not restored, does not exist'.format(name))
Output:
in user_func():
Cat: <class 'framework.Cat'>
Mouse: <class 'framework.Mouse'>
Cheese: <class 'framework.Cheese'>
after user_func():
Cheese not restored, does not exist
Mouse not restored, does not exist
Cat restored to <class '__main__.Cat'>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With