Last active
November 18, 2021 11:26
-
-
Save fxthomas/aa59e25df0bfa15d46a5bcb01b7017dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class JobObjectPopen(Popen): | |
| """Start a process in a new Win32 job object. | |
| This `subprocess.Popen` subclass takes the same arguments as Popen and | |
| behaves the same way. In addition to that, created processes will be | |
| assigned to a new anonymous Win32 job object on startup, which will | |
| guarantee that the processes will be terminated by the OS as soon as | |
| either the Popen object, job object handle or parent Python process are | |
| closed. | |
| """ | |
| class _winapijobhandler(object): | |
| """Patches the subprocess._winapi module to automatically assign created threads in a job""" | |
| def __init__(self, oldapi, job): | |
| self._oldapi = oldapi | |
| self._job = job | |
| def __getattr__(self, key): | |
| if key != "CreateProcess": | |
| return getattr(self._oldapi, key) | |
| else: | |
| return self.CreateProcess | |
| def CreateProcess(self, *args, **kwargs): | |
| hp, ht, pid, tid = self._oldapi.CreateProcess(*args, **kwargs) | |
| win32job.AssignProcessToJobObject(self._job, hp) | |
| win32process.ResumeThread(ht) | |
| return hp, ht, pid, tid | |
| def __init__(self, *args, **kwargs): | |
| # Create a new job object | |
| self._win32_job = self._create_job_object() | |
| # Temporarily patch the subprocess creation logic to assign created | |
| # processes to the new job, then resume execution normally. | |
| CREATE_SUSPENDED = 0x00000004 | |
| kwargs.setdefault("creationflags", 0) | |
| kwargs["creationflags"] |= CREATE_SUSPENDED | |
| _winapi = subprocess._winapi | |
| try: | |
| subprocess._winapi = JobObjectPopen._winapijobhandler(_winapi, self._win32_job) | |
| super(JobObjectPopen, self).__init__(*args, **kwargs) | |
| finally: | |
| subprocess._winapi = _winapi | |
| def _create_job_object(self): | |
| """Create a new anonymous job object""" | |
| hjob = win32job.CreateJobObject(None, "") | |
| extended_info = win32job.QueryInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation) | |
| extended_info['BasicLimitInformation']['LimitFlags'] = win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | |
| win32job.SetInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation, extended_info) | |
| return hjob | |
| def _close_job_object(self, hjob): | |
| """Close the handle to a job object""" | |
| if self._win32_job: | |
| win32api.CloseHandle(self._win32_job) | |
| self._win32_job = None | |
| def __exit__(self, exc_type, value, traceback): | |
| super(JobObjectPopen, self).__exit__(exc_type, value, traceback) | |
| self._close_job_object(self._win32_job) | |
| def __del__(self, *args, **kwargs): | |
| Popen.__del__(self, *args, **kwargs) | |
| self._close_job_object(self._win32_job) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment