"""A clean wrapper around Windows APIs allowing interprocess communication over named pipes Here is a clear, simple explanation of the incantation needed to do this correctly: * From the client, spawn the server. * The server should CreateNamedPipe, then ConnectNamedPipe * The client must poll for the pipe, then open with CreateFile * Then each can read/write from the pipe as needed. * Finally, the client should close the pipe. """ import io import time from types import TracebackType from typing import Optional, Type import _winapi import time from contextlib import ContextDecorator class NamedPipeBase(io.StringIO): handle = _winapi.NULL buffer_size = 2**16 def __init__(self, name: str) -> None: self.name = r'\\.\pipe\{}'.format(name) def read(self, size: Optional[int] = None) -> str: if self.handle != _winapi.NULL: msg, status = _winapi.ReadFile(self.handle, size if size else self.buffer_size) return msg.decode() else: raise WindowsError("Cannot read pipe") def write(self, msg: str) -> int: if self.handle != _winapi.NULL: return _winapi.WriteFile(self.handle, msg.encode(), len(msg)) else: raise WindowsError("Cannot read pipe") def __exit__(self, exc_ty: Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exc_tb: Optional[TracebackType] = None, ) -> bool: if self.handle != _winapi.NULL: _winapi.CloseHandle(self.handle) return False class NamedPipeServer(NamedPipeBase): def __init__(self, name: str) -> None: super().__init__(name) self.handle = _winapi.CreateNamedPipe( self.name, _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, _winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_WAIT, 1, # one instance self.buffer_size, self.buffer_size, 0, _winapi.NULL, ) if _winapi.GetLastError() != 0: err = _winapi.GetLastError() raise WindowsError(f'Error creating pipe: {err}') def __enter__(self) -> 'NamedPipeServer': _winapi.ConnectNamedPipe(self.handle, _winapi.NULL) return self class NamedPipeClient(NamedPipeBase): def __init__(self, name: str) -> None: super().__init__(name) # Sadly we need to try several times until this works, or it times out # thanks Windows done = False while not done: try: self.handle = _winapi.CreateFile( self.name, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 0, _winapi.NULL, ) _winapi.SetNamedPipeHandleState(self.handle, _winapi.PIPE_READMODE_MESSAGE, _winapi.NULL, _winapi.NULL) except FileNotFoundError: raise WindowsError( "Unable to open connection to pipe at {}".format(self.name) ) except WindowsError as e: if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY): break else: time.sleep(1) break def __enter__(self) -> 'NamedPipeClient': return self