-
-
Save dhagrow/d3414e3c6ae25dfa606238355aea2ca5 to your computer and use it in GitHub Desktop.
| """ | |
| Python is a dynamic language, and it is relatively easy to dynamically create | |
| and modify things such as classes and objects. Functions, however, are quite | |
| challenging to create dynamically. | |
| One area where we might want to do this is in an RPC library, where a function | |
| defined on a server needs to be available remotely on a client. | |
| The naive solution is to simply pass arguments to a generic function that | |
| accepts `*args` and `**kwargs`. A lot of information is lost with this approach, | |
| however, in particular the number of arguments taken. Used in an RPC | |
| implementation, this also delays any error feedback until after the arguments | |
| have reached the server. | |
| If you search online, most practical solutions involve `exec()`. This is | |
| generally the approach chosen by many Python RPC libraries. This is, of course, | |
| a very insecure solution, one that opens any program up to malicious code | |
| execution. | |
| This experiment creates a real function at the highest layer available: the AST. | |
| There are several challenges to this approach. The most significant is that on | |
| the AST layer, function arguments must be defined according to their type. This | |
| greatly limits the flexibility allowed when defining a function with Python | |
| syntax. | |
| This experiment has a few requirements that introduce (and relieve) additional | |
| challenges: | |
| - Must return a representative function signature to the Python interpreter | |
| - Must support both Python 2 and 3 | |
| - Must allow serialization to JSON and/or MsgPack | |
| """ | |
| from __future__ import print_function | |
| import ast | |
| import sys | |
| import types | |
| import numbers | |
| import collections | |
| PY3 = sys.version_info.major >= 3 | |
| def create_function(name, signature, callback): | |
| """Dynamically creates a function that wraps a call to *callback*, based | |
| on the provided *signature*. | |
| Note that only default arguments with a value of `None` are supported. Any | |
| other value will raise a `TypeError`. | |
| """ | |
| # utils to set default values when creating a ast objects | |
| Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw) | |
| Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load()) | |
| # vars for the callback call | |
| call_args = [] | |
| call_keywords = [] # PY3 | |
| call_starargs = None # PY2 | |
| call_kwargs = None # PY2 | |
| # vars for the generated function signature | |
| func_args = [] | |
| func_defaults = [] | |
| vararg = None | |
| kwarg = None | |
| # vars for the args with default values | |
| defaults = [] | |
| # assign args based on *signature* | |
| for param in viewvalues(signature.parameters): | |
| if param.default is not param.empty: | |
| if isinstance(param.default, type(None)): | |
| # `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
| func_defaults.append(Name('None')) | |
| elif isinstance(param.default, bool): | |
| # `ast.NameConstant` is used in PY3, but both support `ast.Name` | |
| typ = str if PY3 else bytes | |
| func_defaults.append(Name(typ(param.default))) | |
| elif isinstance(param.default, numbers.Number): | |
| func_defaults.append(Loc(ast.Num, n=param.default)) | |
| elif isinstance(param.default, str): | |
| func_defaults.append(Loc(ast.Str, s=param.default)) | |
| elif isinstance(param.default, bytes): | |
| typ = ast.Bytes if PY3 else ast.Str | |
| func_defaults.append(Loc(typ, s=param.default)) | |
| elif isinstance(param.default, list): | |
| func_defaults.append(Loc(ast.List, | |
| elts=param.default, ctx=ast.Load())) | |
| elif isinstance(param.default, tuple): | |
| func_defaults.append(Loc(ast.Tuple, | |
| elts=list(param.default), ctx=ast.Load())) | |
| elif isinstance(param.default, dict): | |
| func_defaults.append(Loc(ast.Dict, | |
| keys=list(viewkeys(param.default)), | |
| values=list(viewvalues(param.default)))) | |
| else: | |
| err = 'unsupported default argument type: {}' | |
| raise TypeError(err.format(type(param.default))) | |
| defaults.append(param.default) | |
| # func_defaults.append(Name('None')) | |
| # defaults.append(None) | |
| if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}: | |
| call_args.append(Name(param.name)) | |
| if PY3: | |
| func_args.append(Loc(ast.arg, arg=param.name)) | |
| else: | |
| func_args.append(Name(param.name, ast.Param())) | |
| elif param.kind == param.VAR_POSITIONAL: | |
| if PY3: | |
| call_args.append(Loc(ast.Starred, | |
| value=Name(param.name), | |
| ctx=ast.Load())) | |
| vararg = Loc(ast.arg, arg=param.name) | |
| else: | |
| call_starargs = Name(param.name) | |
| vararg = param.name | |
| elif param.kind == param.KEYWORD_ONLY: | |
| err = 'TODO: KEYWORD_ONLY param support, param: {}' | |
| raise TypeError(err.format(param.name)) | |
| elif param.kind == param.VAR_KEYWORD: | |
| if PY3: | |
| call_keywords.append(Loc(ast.keyword, | |
| arg=None, value=Name(param.name))) | |
| kwarg = Loc(ast.arg, arg=param.name) | |
| else: | |
| call_kwargs = Name(param.name) | |
| kwarg = param.name | |
| # generate the ast for the *callback* call | |
| call_ast = Loc(ast.Call, | |
| func=Name(callback.__name__), | |
| args=call_args, keywords=call_keywords, | |
| starargs=call_starargs, kwargs=call_kwargs) | |
| # generate the function ast | |
| func_ast = Loc(ast.FunctionDef, name=to_func_name(name), | |
| args=ast.arguments( | |
| args=func_args, vararg=vararg, defaults=func_defaults, | |
| kwarg=kwarg, kwonlyargs=[], kw_defaults=[]), | |
| body=[Loc(ast.Return, value=call_ast)], | |
| decorator_list=[], returns=None) | |
| # compile the ast and get the function code | |
| mod_ast = ast.Module(body=[func_ast]) | |
| module_code = compile(mod_ast, '<generated-ast>', 'exec') | |
| func_code = [c for c in module_code.co_consts | |
| if isinstance(c, types.CodeType)][0] | |
| # return the generated function | |
| return types.FunctionType(func_code, {callback.__name__: callback}, | |
| argdefs=tuple(defaults)) | |
| ## | |
| ## support functions | |
| ## | |
| def viewitems(obj): | |
| return getattr(obj, "viewitems", obj.items)() | |
| def viewkeys(obj): | |
| return getattr(obj, "viewkeys", obj.keys)() | |
| def viewvalues(obj): | |
| return getattr(obj, "viewvalues", obj.values)() | |
| def to_func_name(name): | |
| # func.__name__ must be bytes in Python2 | |
| return to_unicode(name) if PY3 else to_bytes(name) | |
| def to_bytes(s, encoding='utf8'): | |
| if isinstance(s, bytes): | |
| pass | |
| elif isinstance(s, str): | |
| s = s.encode(encoding) | |
| return s | |
| def to_unicode(s, encoding='utf8'): | |
| if isinstance(s, bytes): | |
| s = s.decode(encoding) | |
| elif isinstance(s, str): | |
| pass | |
| elif isinstance(s, dict): | |
| s = {to_unicode(k): to_unicode(v) for k, v in viewitems(s)} | |
| elif isinstance(s, collections.Iterable): | |
| s = [to_unicode(x, encoding) for x in s] | |
| return s | |
| ## | |
| ## demo | |
| ## | |
| def main(): | |
| if PY3: | |
| from inspect import signature | |
| else: | |
| from funcsigs import signature | |
| # original function | |
| def original(a, b, *args, **kwargs): | |
| return a, b, args, kwargs | |
| sig = signature(original) | |
| print('original:', original) | |
| print('original signature:', sig) | |
| print('original ret:', original(1, 2, 4, borp='torp')) | |
| # cloned function | |
| def callback(*args, **kwargs): | |
| return args, kwargs | |
| cloned = create_function('clone', sig, callback) | |
| sig = signature(cloned) | |
| print('cloned:', cloned) | |
| print('cloned signature:', sig) | |
| print('cloned ret:', cloned(1, 2, 4, borp='torp')) | |
| if __name__ == '__main__': | |
| main() |
Thanks for this! That was very helpful!
I've added keyword-only support (python 3 only) here: https://gist.github.com/dsuess/1a5919b598f54d24010eb0a7a79e71a0
hello @dhagrow I wonder what is the license of this code in case I would want to use it.
For the record, I ultimately relented and opted for an exec()-based solution for my needs, which I believe can be implemented securely. I found that this dynamic version required too much maintenance, because the AST was changing even between minor versions of Python.
thanks! In the end I didn't end-up needing it. But I still found it very educational 👍
btw in my case I was just doing terrible hack so robustness sort of went out of window from get go. You can see it here https://github.com/turboMaCk/pytriarchy
For python 3.11, needs posonlyargs=[], in ast.arguments call, and type_ignores=[] in ast.Module call.
Output of the script ...
Python 2:
Note that the funcsigs package is required for Python 2.
Python 3: