|  | import inspect | 
        
          |  | import injections | 
        
          |  | import functools | 
        
          |  | import textwrap | 
        
          |  |  | 
        
          |  |  | 
        
          |  | def inject_from(source): | 
        
          |  | """Decorator for injecting dependencies into standalone functions. | 
        
          |  |  | 
        
          |  | source argument is an instance of any object marked with ``injections.has`` | 
        
          |  |  | 
        
          |  | decorated function must use annotations for arguments which must be | 
        
          |  | replaced by dependency. | 
        
          |  |  | 
        
          |  | Usage:: | 
        
          |  |  | 
        
          |  | >>> @injections.has | 
        
          |  | ... class SomeDeps: | 
        
          |  | ...     redis = injections.depends(Redis) | 
        
          |  |  | 
        
          |  | >>> inst = SomeDeps() | 
        
          |  |  | 
        
          |  | >>> @inject_from(inst) | 
        
          |  | ... def get_key(key, redis: Redis): | 
        
          |  | ...     return redis.get(key) | 
        
          |  |  | 
        
          |  | >>> print(get_key('foo')) | 
        
          |  | """ | 
        
          |  |  | 
        
          |  | deps = {d.name: d.type for d in injections.dependencies(source).values()} | 
        
          |  | assert deps, "Object {!r} has no dependencies".format(source) | 
        
          |  |  | 
        
          |  | def wrapper(fun): | 
        
          |  | sig = inspect.signature(fun) | 
        
          |  | params = sig.parameters.copy() | 
        
          |  | allowed_kinds = (inspect.Parameter.POSITIONAL_OR_KEYWORD, | 
        
          |  | inspect.Parameter.KEYWORD_ONLY) | 
        
          |  | fun_args = [] | 
        
          |  |  | 
        
          |  | def push_fun_arg(param): | 
        
          |  | if param.kind is param.KEYWORD_ONLY: | 
        
          |  | fun_args.append('{0}={0}'.format(param.name)) | 
        
          |  | else: | 
        
          |  | fun_args.append(str(param.replace(annotation=param.empty, | 
        
          |  | default=param.empty))) | 
        
          |  |  | 
        
          |  | for name, param in list(params.items()): | 
        
          |  | ann = param.annotation | 
        
          |  | if ann is param.empty: | 
        
          |  | push_fun_arg(param) | 
        
          |  | continue | 
        
          |  | if isinstance(ann, injections.Dependency): | 
        
          |  | dep_type = ann.type | 
        
          |  | dep_name = ann.name or name | 
        
          |  | elif isinstance(ann, type) and name in deps: | 
        
          |  | dep_type = ann | 
        
          |  | dep_name = name | 
        
          |  | else: | 
        
          |  | push_fun_arg(param) | 
        
          |  | continue | 
        
          |  | assert param.kind in allowed_kinds, param | 
        
          |  | assert param.default is param.empty, ( | 
        
          |  | "Parameter {!r} has default value".format(name)) | 
        
          |  | assert dep_name in deps, ( | 
        
          |  | "Unknown dependency {!r}({!r}) {!r}" | 
        
          |  | .format(dep_name, dep_type, deps)) | 
        
          |  | assert issubclass(dep_type, deps[dep_name]), ( | 
        
          |  | "Dependencies missmatch {!r}({!r}) {!r}" | 
        
          |  | .format(dep_name, dep_type, deps[dep_name])) | 
        
          |  | params.pop(name) | 
        
          |  | if param.kind is param.KEYWORD_ONLY: | 
        
          |  | fun_args.append('{}=source.{}'.format(name, dep_name)) | 
        
          |  | else: | 
        
          |  | fun_args.append('source.{}'.format(dep_name)) | 
        
          |  | # FIXME: wrapper signature may contain custom annotations | 
        
          |  | #        which should be visible to exec | 
        
          |  | code = """ | 
        
          |  | def wrapper{sig}: | 
        
          |  | return {fun}({fun_args}) | 
        
          |  | """ | 
        
          |  | code = code.format(sig=sig.replace(parameters=params.values()), | 
        
          |  | fun=fun.__name__, fun_args=', '.join(fun_args)) | 
        
          |  | code = textwrap.dedent(code) | 
        
          |  | co = compile(code, '<string>', 'exec') | 
        
          |  | locals_ = {} | 
        
          |  | try: | 
        
          |  | exec(co, {'source': source, fun.__name__: fun}, locals_) | 
        
          |  | except NameError: | 
        
          |  | raise AssertionError( | 
        
          |  | "Possibly some dependencies were not resolved:\n{!s}" | 
        
          |  | .format(code)) | 
        
          |  | wrapper = locals_['wrapper'] | 
        
          |  | return functools.wraps(fun)(wrapper) | 
        
          |  |  | 
        
          |  | return wrapper |