#!/usr/bin/env python3 # -*- coding: utf-8 -*- """WORK IN PROGRESS!! USE AT YOUR OWN RISK!!""" import os import sys from typing import Dict, List, Optional from urllib.parse import urlparse, ParseResult from functools import partial import asyncio from loguru import logger import pydantic class ArgsRequest(pydantic.BaseModel): """Structure representing a request""" path: str # path is the path of the command to run args: List[str] = [] addr: str message: Optional[str] = None id: Optional[str] = None bufsize: int = 4096 shell: bool = False cwd: Optional[str] = None env: Dict[str, str] = {} timeout: Optional[float] = None class ArgsResponse(pydantic.BaseModel): """Structure representing a response to ArgsRequest""" status: int message: Optional[str] = None id: Optional[str] = None class CliArgs(ArgsRequest): path: Optional[str] = None verb: str addr: str addrs: List[str] = None allowed_commands: List[str] = None message: Optional[str] = None class Config: extra = pydantic.Extra.ignore def arg_parser(): import argparse parser = argparse.ArgumentParser( description="""Subprocess as-a-service. Very basic, only mirrors output to stdout/stderr""" ) parser.add_argument( "-a", "--addr", default='http://localhost:7701', action="store", type=str, help="Main address to listen/talk on ", ) parser.add_argument( "-A", "--addrs", default=[], action="append", help="list multiple addresses " ) subparsers = parser.add_subparsers(help="sub-commands") parser_c = subparsers.add_parser("c", help="client") parser_c.set_defaults(verb="client") parser_c.add_argument( "path", type=str, help="Path of the command to be issued" ) parser_c.add_argument( "args", nargs="*", type=str, default=[], help="Arguments to be passed to the command" ) parser_c.add_argument( "-m", "--message", default="Hello, world!", action="store", type=str, help="message to send", ) parser_s = subparsers.add_parser("s", help="server") parser_s.set_defaults(verb="server") parser_s.add_argument( "allowed_commands", nargs="*", type=str, help="Only run commands that match one of these filters", ) return parser def get_server_func(addr: str): serve_pars: ParseResult = urlparse(addr) if serve_pars.scheme == "unix": return partial(asyncio.start_unix_server, path=serve_pars.path) elif serve_pars.port and serve_pars.hostname: return partial( asyncio.start_server, host=serve_pars.hostname, port=serve_pars.port ) else: raise ValueError(f"cannot parse URI addr: {addr}") def get_connection_func(addr): pars = urlparse(addr) logger.debug(pars) if pars.scheme == "unix": return partial(asyncio.open_unix_connection, path=pars.path) elif pars.port and pars.hostname: return partial(asyncio.open_connection, host=pars.hostname, port=pars.port) else: raise ValueError(f"cannot parse URI addr: {addr}") async def tcp_subproc_client(args: ArgsRequest): logger.info(args) message = args.json() addr = args.addr open_connection = get_connection_func(addr) reader, writer = await open_connection() logger.info(f"Send: {message!r}") writer.write(message.encode()) await writer.drain() data = await reader.read(args.bufsize) response = data.decode() ok = response == message logger.info(f"Received {ok}: {data.decode()!r}") logger.debug("Close the connection") writer.close() await writer.wait_closed() class SubprocRunner(object): def __init__(self, args: CliArgs): self.args = args async def handle_run_subproc(self, reader, writer): data = await reader.read(self.args.bufsize) payload = ArgsRequest.parse_raw(data) try: addr = writer.get_extra_info("peername") except Exception as exc: addr = "{}: {}".format(exc.__class__.__name__, exc) logger.info(f"Received {payload!r} from {addr!r}") if payload.path not in self.args.allowed_commands: resp = ArgsResponse(status=405, message=f"'{payload.path}' is a forbidden command") writer.write(resp.json().encode()) await writer.drain() writer.close() return # for simplicity, we are just gonna wait for the response await self.run_subprocess(payload) resp = ArgsResponse(status=200, message=f"'{payload.path}' success") data = resp.json().encode() logger.debug(f"Send: {data!r}") writer.write(data) await writer.drain() logger.debug("Close the connection") writer.close() async def run_subprocess(self, cmd: ArgsRequest): logger.info("start: \n{} {}".format(cmd, " ".join(cmd.args))) process = await asyncio.create_subprocess_exec( cmd.path, *cmd.args, ) return await process.wait() async def spawn_server(addr, args: CliArgs): start_server = get_server_func(addr) runner = SubprocRunner(args) server = await start_server(runner.handle_run_subproc) myaddr = server.sockets[0].getsockname() logger.info(f"Serving on {myaddr}") async with server: await server.serve_forever() async def main_server(args: CliArgs): addrs = args.addrs if args.addr is not None: addrs.append(args.addr) loop = asyncio.get_event_loop() futures = [loop.create_task(spawn_server(addr, args)) for addr in addrs] return futures def main(): parser = arg_parser() args = parser.parse_args() args = CliArgs(**vars(args)) logger.info(args) verb = args.verb if verb == "client": c_args = ArgsRequest.parse_obj(args) asyncio.run(tcp_subproc_client(c_args)) return loop = asyncio.get_event_loop() if verb == "server": future = loop.create_task(main_server(args)) logger.debug(f"created {future}") else: raise ValueError(f"unknown command: {verb}") loop.run_forever() if __name__ == "__main__": main()