Skip to content

Instantly share code, notes, and snippets.

@dsuess
Forked from dhagrow/create_function.py
Last active October 24, 2022 09:35
Show Gist options
  • Select an option

  • Save dsuess/1a5919b598f54d24010eb0a7a79e71a0 to your computer and use it in GitHub Desktop.

Select an option

Save dsuess/1a5919b598f54d24010eb0a7a79e71a0 to your computer and use it in GitHub Desktop.

Revisions

  1. dsuess revised this gist Apr 1, 2020. 1 changed file with 95 additions and 92 deletions.
    187 changes: 95 additions & 92 deletions create_function.py
    Original file line number Diff line number Diff line change
    @@ -2,181 +2,188 @@
    Python is a dynamic language, and it is relatively easy to dynamically create
    and modify things such as classes and objects. Functions, however, are quite
    challenging to create dynamically.
    One area where we might want to do this is in an RPC library, where a function
    defined on a server needs to be available remotely on a client.
    The naive solution is to simply pass arguments to a generic function that
    accepts `*args` and `**kwargs`. A lot of information is lost with this approach,
    however, in particular the number of arguments taken. Used in an RPC
    implementation, this also delays any error feedback until after the arguments
    have reached the server.
    If you search online, most practical solutions involve `exec()`. This is
    generally the approach chosen by many Python RPC libraries. This is, of course,
    a very insecure solution, one that opens any program up to malicious code
    execution.
    This experiment creates a real function at the highest layer available: the AST.
    There are several challenges to this approach. The most significant is that on
    the AST layer, function arguments must be defined according to their type. This
    greatly limits the flexibility allowed when defining a function with Python
    syntax.
    This experiment creates a real function at the highest layer available: the AST.
    There are several challenges to this approach. The most significant is that on
    the AST layer, function arguments must be defined according to their type. This
    greatly limits the flexibility allowed when defining a function with Python
    syntax.
    This experiment has a few requirements that introduce (and relieve) additional
    challenges:
    - Must return a representative function signature to the Python interpreter
    - Must support both Python 2 and 3
    - Must allow serialization to JSON and/or MsgPack
    Taken from https://gist.github.com/dhagrow/d3414e3c6ae25dfa606238355aea2ca5
    """

    from __future__ import print_function

    import ast
    import sys
    import types
    import numbers
    import collections

    PY3 = sys.version_info.major >= 3

    def create_function(name, signature, callback):
    """Dynamically creates a function that wraps a call to *callback*, based
    on the provided *signature*.
    Note that only default arguments with a value of `None` are supported. Any
    other value will raise a `TypeError`.
    """
    # utils to set default values when creating a ast objects
    Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw)
    Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load())

    # vars for the callback call
    call_args = []
    call_keywords = [] # PY3
    call_starargs = None # PY2
    call_kwargs = None # PY2
    call_keywords = []

    # vars for the generated function signature
    func_args = []
    func_kwargs = []
    func_defaults = []
    func_kwdefaults = []
    vararg = None
    kwarg = None

    # vars for the args with default values
    defaults = []
    kwdefaults = dict()

    # assign args based on *signature*
    for param in viewvalues(signature.parameters):
    if param.default is not param.empty:
    if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}:
    add_to = func_defaults
    defaults.append(param.default)
    elif param.kind is param.KEYWORD_ONLY:
    add_to = func_kwdefaults
    kwdefaults[param.name] = param.default
    else:
    raise TypeError("Shouldnt have defaults for other types")

    if isinstance(param.default, type(None)):
    # `ast.NameConstant` is used in PY3, but both support `ast.Name`
    func_defaults.append(Name('None'))
    add_to.append(Name("None"))
    elif isinstance(param.default, bool):
    # `ast.NameConstant` is used in PY3, but both support `ast.Name`
    typ = str if PY3 else bytes
    func_defaults.append(Name(typ(param.default)))
    add_to.append(Name(str(param.default)))
    elif isinstance(param.default, numbers.Number):
    func_defaults.append(Loc(ast.Num, n=param.default))
    add_to.append(Loc(ast.Num, n=param.default))
    elif isinstance(param.default, str):
    func_defaults.append(Loc(ast.Str, s=param.default))
    add_to.append(Loc(ast.Str, s=param.default))
    elif isinstance(param.default, bytes):
    typ = ast.Bytes if PY3 else ast.Str
    func_defaults.append(Loc(typ, s=param.default))
    add_to.append(Loc(ast.Bytes, s=param.default))
    elif isinstance(param.default, list):
    func_defaults.append(Loc(ast.List,
    elts=param.default, ctx=ast.Load()))
    add_to.append(Loc(ast.List, elts=param.default, ctx=ast.Load()))
    elif isinstance(param.default, tuple):
    func_defaults.append(Loc(ast.Tuple,
    elts=list(param.default), ctx=ast.Load()))
    add_to.append(Loc(ast.Tuple, elts=list(param.default), ctx=ast.Load()))
    elif isinstance(param.default, dict):
    func_defaults.append(Loc(ast.Dict,
    keys=list(viewkeys(param.default)),
    values=list(viewvalues(param.default))))
    add_to.append(
    Loc(
    ast.Dict,
    keys=list(viewkeys(param.default)),
    values=list(viewvalues(param.default)),
    )
    )
    else:
    err = 'unsupported default argument type: {}'
    err = "unsupported default argument type: {}"
    raise TypeError(err.format(type(param.default)))
    defaults.append(param.default)
    # func_defaults.append(Name('None'))
    # defaults.append(None)
    elif param.kind is param.KEYWORD_ONLY:
    # If it's a keyword-only arugment, we need to add a None-default
    # value
    func_kwdefaults.append(None)

    if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}:
    call_args.append(Name(param.name))
    if PY3:
    func_args.append(Loc(ast.arg, arg=param.name))
    else:
    func_args.append(Name(param.name, ast.Param()))
    func_args.append(Loc(ast.arg, arg=param.name))
    elif param.kind == param.VAR_POSITIONAL:
    if PY3:
    call_args.append(Loc(ast.Starred,
    value=Name(param.name),
    ctx=ast.Load()))
    vararg = Loc(ast.arg, arg=param.name)
    else:
    call_starargs = Name(param.name)
    vararg = param.name
    call_args.append(Loc(ast.Starred, value=Name(param.name), ctx=ast.Load()))
    vararg = Loc(ast.arg, arg=param.name)
    elif param.kind == param.KEYWORD_ONLY:
    err = 'TODO: KEYWORD_ONLY param support, param: {}'
    raise TypeError(err.format(param.name))
    call_keywords.append(
    Loc(ast.keyword, arg=param.name, value=Name(param.name))
    )
    func_kwargs.append(Loc(ast.arg, arg=param.name))
    elif param.kind == param.VAR_KEYWORD:
    if PY3:
    call_keywords.append(Loc(ast.keyword,
    arg=None, value=Name(param.name)))
    kwarg = Loc(ast.arg, arg=param.name)
    else:
    call_kwargs = Name(param.name)
    kwarg = param.name
    call_keywords.append(Loc(ast.keyword, arg=None, value=Name(param.name)))
    kwarg = Loc(ast.arg, arg=param.name)

    # generate the ast for the *callback* call
    call_ast = Loc(ast.Call,
    func=Name(callback.__name__),
    args=call_args, keywords=call_keywords,
    starargs=call_starargs, kwargs=call_kwargs)
    call_ast = Loc(
    ast.Call, func=Name(callback.__name__), args=call_args, keywords=call_keywords
    )

    # generate the function ast
    func_ast = Loc(ast.FunctionDef, name=to_func_name(name),
    func_ast = Loc(
    ast.FunctionDef,
    name=to_func_name(name),
    args=ast.arguments(
    args=func_args, vararg=vararg, defaults=func_defaults,
    kwarg=kwarg, kwonlyargs=[], kw_defaults=[]),
    args=func_args,
    vararg=vararg,
    defaults=func_defaults,
    kwarg=kwarg,
    kwonlyargs=func_kwargs,
    kw_defaults=func_kwdefaults,
    ),
    body=[Loc(ast.Return, value=call_ast)],
    decorator_list=[], returns=None)
    decorator_list=[],
    returns=None,
    )

    # compile the ast and get the function code
    mod_ast = ast.Module(body=[func_ast])
    module_code = compile(mod_ast, '<generated-ast>', 'exec')
    func_code = [c for c in module_code.co_consts
    if isinstance(c, types.CodeType)][0]

    module_code = compile(mod_ast, "<generated-ast>", "exec")
    func_code = [c for c in module_code.co_consts if isinstance(c, types.CodeType)][0]
    # return the generated function
    return types.FunctionType(func_code, {callback.__name__: callback},
    argdefs=tuple(defaults))
    func = types.FunctionType(
    func_code, {callback.__name__: callback}, argdefs=tuple(defaults)
    )
    func.__kwdefaults__ = kwdefaults
    return func


    ##
    ## support functions
    ##


    def viewitems(obj):
    return getattr(obj, "viewitems", obj.items)()


    def viewkeys(obj):
    return getattr(obj, "viewkeys", obj.keys)()


    def viewvalues(obj):
    return getattr(obj, "viewvalues", obj.values)()


    def to_func_name(name):
    # func.__name__ must be bytes in Python2
    return to_unicode(name) if PY3 else to_bytes(name)
    return to_unicode(name)

    def to_bytes(s, encoding='utf8'):

    def to_bytes(s, encoding="utf8"):
    if isinstance(s, bytes):
    pass
    elif isinstance(s, str):
    s = s.encode(encoding)
    return s

    def to_unicode(s, encoding='utf8'):

    def to_unicode(s, encoding="utf8"):
    if isinstance(s, bytes):
    s = s.decode(encoding)
    elif isinstance(s, str):
    @@ -187,34 +194,30 @@ def to_unicode(s, encoding='utf8'):
    s = [to_unicode(x, encoding) for x in s]
    return s

    ##
    ## demo
    ##

    def main():
    if PY3:
    from inspect import signature
    else:
    from funcsigs import signature
    from inspect import signature

    # original function
    def original(a, b, *args, **kwargs):
    return a, b, args, kwargs
    def original(a, b, *args, c, d=10, **kwargs):
    return a, b, args, c, kwargs

    sig = signature(original)
    print('original:', original)
    print('original signature:', sig)
    print('original ret:', original(1, 2, 4, borp='torp'))
    print("original:", original)
    print("original signature:", sig)
    print("original ret:", original(1, 2, 4, c=5, borp="torp"))

    # cloned function
    def callback(*args, **kwargs):
    return args, kwargs
    cloned = create_function('clone', sig, callback)

    cloned = create_function("clone", sig, callback)

    sig = signature(cloned)
    print('cloned:', cloned)
    print('cloned signature:', sig)
    print('cloned ret:', cloned(1, 2, 4, borp='torp'))
    print("cloned:", cloned)
    print("cloned signature:", sig)
    print("cloned ret:", cloned(1, 2, 4, c=5, borp="torp"))


    if __name__ == '__main__':
    main()
    if __name__ == "__main__":
    main()
  2. @dhagrow dhagrow revised this gist Mar 15, 2018. 1 changed file with 5 additions and 5 deletions.
    10 changes: 5 additions & 5 deletions create_function.py
    Original file line number Diff line number Diff line change
    @@ -17,11 +17,11 @@
    a very insecure solution, one that opens any program up to malicious code
    execution.
    This experiment creates a real function at the highest layer available:
    the AST. There are several challenges to this approach. The most significant is
    that on the AST layer, function arguments must be defined according to their
    type. This greatly limits the flexibility allowed when defining a function with
    Python syntax.
    This experiment creates a real function at the highest layer available: the AST.
    There are several challenges to this approach. The most significant is that on
    the AST layer, function arguments must be defined according to their type. This
    greatly limits the flexibility allowed when defining a function with Python
    syntax.
    This experiment has a few requirements that introduce (and relieve) additional
    challenges:
  3. @dhagrow dhagrow revised this gist Mar 15, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion create_function.py
    Original file line number Diff line number Diff line change
    @@ -17,7 +17,7 @@
    a very insecure solution, one that opens any program up to malicious code
    execution.
    This experiment creates a real function at the highest possible layer available:
    This experiment creates a real function at the highest layer available:
    the AST. There are several challenges to this approach. The most significant is
    that on the AST layer, function arguments must be defined according to their
    type. This greatly limits the flexibility allowed when defining a function with
  4. @dhagrow dhagrow revised this gist Mar 15, 2018. 1 changed file with 33 additions and 0 deletions.
    33 changes: 33 additions & 0 deletions create_function.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,36 @@
    """
    Python is a dynamic language, and it is relatively easy to dynamically create
    and modify things such as classes and objects. Functions, however, are quite
    challenging to create dynamically.
    One area where we might want to do this is in an RPC library, where a function
    defined on a server needs to be available remotely on a client.
    The naive solution is to simply pass arguments to a generic function that
    accepts `*args` and `**kwargs`. A lot of information is lost with this approach,
    however, in particular the number of arguments taken. Used in an RPC
    implementation, this also delays any error feedback until after the arguments
    have reached the server.
    If you search online, most practical solutions involve `exec()`. This is
    generally the approach chosen by many Python RPC libraries. This is, of course,
    a very insecure solution, one that opens any program up to malicious code
    execution.
    This experiment creates a real function at the highest possible layer available:
    the AST. There are several challenges to this approach. The most significant is
    that on the AST layer, function arguments must be defined according to their
    type. This greatly limits the flexibility allowed when defining a function with
    Python syntax.
    This experiment has a few requirements that introduce (and relieve) additional
    challenges:
    - Must return a representative function signature to the Python interpreter
    - Must support both Python 2 and 3
    - Must allow serialization to JSON and/or MsgPack
    """

    from __future__ import print_function

    import ast
  5. @dhagrow dhagrow created this gist Mar 15, 2018.
    187 changes: 187 additions & 0 deletions create_function.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,187 @@
    from __future__ import print_function

    import ast
    import sys
    import types
    import numbers
    import collections

    PY3 = sys.version_info.major >= 3

    def create_function(name, signature, callback):
    """Dynamically creates a function that wraps a call to *callback*, based
    on the provided *signature*.
    Note that only default arguments with a value of `None` are supported. Any
    other value will raise a `TypeError`.
    """
    # utils to set default values when creating a ast objects
    Loc = lambda cls, **kw: cls(annotation=None, lineno=1, col_offset=0, **kw)
    Name = lambda id, ctx=None: Loc(ast.Name, id=id, ctx=ctx or ast.Load())

    # vars for the callback call
    call_args = []
    call_keywords = [] # PY3
    call_starargs = None # PY2
    call_kwargs = None # PY2

    # vars for the generated function signature
    func_args = []
    func_defaults = []
    vararg = None
    kwarg = None

    # vars for the args with default values
    defaults = []

    # assign args based on *signature*
    for param in viewvalues(signature.parameters):
    if param.default is not param.empty:
    if isinstance(param.default, type(None)):
    # `ast.NameConstant` is used in PY3, but both support `ast.Name`
    func_defaults.append(Name('None'))
    elif isinstance(param.default, bool):
    # `ast.NameConstant` is used in PY3, but both support `ast.Name`
    typ = str if PY3 else bytes
    func_defaults.append(Name(typ(param.default)))
    elif isinstance(param.default, numbers.Number):
    func_defaults.append(Loc(ast.Num, n=param.default))
    elif isinstance(param.default, str):
    func_defaults.append(Loc(ast.Str, s=param.default))
    elif isinstance(param.default, bytes):
    typ = ast.Bytes if PY3 else ast.Str
    func_defaults.append(Loc(typ, s=param.default))
    elif isinstance(param.default, list):
    func_defaults.append(Loc(ast.List,
    elts=param.default, ctx=ast.Load()))
    elif isinstance(param.default, tuple):
    func_defaults.append(Loc(ast.Tuple,
    elts=list(param.default), ctx=ast.Load()))
    elif isinstance(param.default, dict):
    func_defaults.append(Loc(ast.Dict,
    keys=list(viewkeys(param.default)),
    values=list(viewvalues(param.default))))
    else:
    err = 'unsupported default argument type: {}'
    raise TypeError(err.format(type(param.default)))
    defaults.append(param.default)
    # func_defaults.append(Name('None'))
    # defaults.append(None)

    if param.kind in {param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD}:
    call_args.append(Name(param.name))
    if PY3:
    func_args.append(Loc(ast.arg, arg=param.name))
    else:
    func_args.append(Name(param.name, ast.Param()))
    elif param.kind == param.VAR_POSITIONAL:
    if PY3:
    call_args.append(Loc(ast.Starred,
    value=Name(param.name),
    ctx=ast.Load()))
    vararg = Loc(ast.arg, arg=param.name)
    else:
    call_starargs = Name(param.name)
    vararg = param.name
    elif param.kind == param.KEYWORD_ONLY:
    err = 'TODO: KEYWORD_ONLY param support, param: {}'
    raise TypeError(err.format(param.name))
    elif param.kind == param.VAR_KEYWORD:
    if PY3:
    call_keywords.append(Loc(ast.keyword,
    arg=None, value=Name(param.name)))
    kwarg = Loc(ast.arg, arg=param.name)
    else:
    call_kwargs = Name(param.name)
    kwarg = param.name

    # generate the ast for the *callback* call
    call_ast = Loc(ast.Call,
    func=Name(callback.__name__),
    args=call_args, keywords=call_keywords,
    starargs=call_starargs, kwargs=call_kwargs)

    # generate the function ast
    func_ast = Loc(ast.FunctionDef, name=to_func_name(name),
    args=ast.arguments(
    args=func_args, vararg=vararg, defaults=func_defaults,
    kwarg=kwarg, kwonlyargs=[], kw_defaults=[]),
    body=[Loc(ast.Return, value=call_ast)],
    decorator_list=[], returns=None)

    # compile the ast and get the function code
    mod_ast = ast.Module(body=[func_ast])
    module_code = compile(mod_ast, '<generated-ast>', 'exec')
    func_code = [c for c in module_code.co_consts
    if isinstance(c, types.CodeType)][0]

    # return the generated function
    return types.FunctionType(func_code, {callback.__name__: callback},
    argdefs=tuple(defaults))

    ##
    ## support functions
    ##

    def viewitems(obj):
    return getattr(obj, "viewitems", obj.items)()

    def viewkeys(obj):
    return getattr(obj, "viewkeys", obj.keys)()

    def viewvalues(obj):
    return getattr(obj, "viewvalues", obj.values)()

    def to_func_name(name):
    # func.__name__ must be bytes in Python2
    return to_unicode(name) if PY3 else to_bytes(name)

    def to_bytes(s, encoding='utf8'):
    if isinstance(s, bytes):
    pass
    elif isinstance(s, str):
    s = s.encode(encoding)
    return s

    def to_unicode(s, encoding='utf8'):
    if isinstance(s, bytes):
    s = s.decode(encoding)
    elif isinstance(s, str):
    pass
    elif isinstance(s, dict):
    s = {to_unicode(k): to_unicode(v) for k, v in viewitems(s)}
    elif isinstance(s, collections.Iterable):
    s = [to_unicode(x, encoding) for x in s]
    return s

    ##
    ## demo
    ##

    def main():
    if PY3:
    from inspect import signature
    else:
    from funcsigs import signature

    # original function
    def original(a, b, *args, **kwargs):
    return a, b, args, kwargs

    sig = signature(original)
    print('original:', original)
    print('original signature:', sig)
    print('original ret:', original(1, 2, 4, borp='torp'))

    # cloned function
    def callback(*args, **kwargs):
    return args, kwargs
    cloned = create_function('clone', sig, callback)

    sig = signature(cloned)
    print('cloned:', cloned)
    print('cloned signature:', sig)
    print('cloned ret:', cloned(1, 2, 4, borp='torp'))

    if __name__ == '__main__':
    main()