Skip to content

Instantly share code, notes, and snippets.

@fxthomas
Last active November 18, 2021 11:26
Show Gist options
  • Save fxthomas/aa59e25df0bfa15d46a5bcb01b7017dd to your computer and use it in GitHub Desktop.
Save fxthomas/aa59e25df0bfa15d46a5bcb01b7017dd to your computer and use it in GitHub Desktop.

Revisions

  1. fxthomas revised this gist Feb 16, 2019. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion job_popen.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    #coding: utf-8
    # coding: utf-8

    from subprocess import Popen
    import subprocess
  2. fxthomas revised this gist Feb 15, 2019. 1 changed file with 14 additions and 5 deletions.
    19 changes: 14 additions & 5 deletions job_popen.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,13 @@
    class JobObjectPopen(Popen):
    #coding: utf-8

    from subprocess import Popen
    import subprocess
    import win32job
    import win32process
    import win32api


    class JobPopen(Popen):
    """Start a process in a new Win32 job object.
    This `subprocess.Popen` subclass takes the same arguments as Popen and
    @@ -47,8 +56,8 @@ def __init__(self, *args, **kwargs):
    _winapi = subprocess._subprocess # Python 2
    _winapi_key = "_subprocess"
    try:
    setattr(subprocess, _winapi_key, JobObjectPopen._winapijobhandler(_winapi, self._win32_job))
    super(JobObjectPopen, self).__init__(*args, **kwargs)
    setattr(subprocess, _winapi_key, JobPopen._winapijobhandler(_winapi, self._win32_job))
    super(JobPopen, self).__init__(*args, **kwargs)
    finally:
    setattr(subprocess, _winapi_key, _winapi)

    @@ -67,9 +76,9 @@ def _close_job_object(self, hjob):
    self._win32_job = None

    # This ensures that no remaining subprocesses are found when the process
    # exits from a `with JobObjectPopen(...)` block.
    # exits from a `with JobPopen(...)` block.
    def __exit__(self, exc_type, value, traceback):
    super(JobObjectPopen, self).__exit__(exc_type, value, traceback)
    super(JobPopen, self).__exit__(exc_type, value, traceback)
    self._close_job_object(self._win32_job)

    # Python does not keep a reference outside of the parent class when the
  3. fxthomas revised this gist Feb 15, 2019. 1 changed file with 20 additions and 8 deletions.
    28 changes: 20 additions & 8 deletions job_popen.py
    Original file line number Diff line number Diff line change
    @@ -10,22 +10,27 @@ class JobObjectPopen(Popen):
    """

    class _winapijobhandler(object):
    """Patches the subprocess._winapi module to automatically assign created threads in a job"""
    """Patches the native CreateProcess function in the subprocess module
    to assign created threads to the given job"""

    def __init__(self, oldapi, job):
    self._oldapi = oldapi
    self._job = job

    def __getattr__(self, key):
    if key != "CreateProcess":
    return getattr(self._oldapi, key)
    return getattr(self._oldapi, key) # Any other function is run as before
    else:
    return self.CreateProcess
    return self.CreateProcess # CreateProcess will call the function below

    def CreateProcess(self, *args, **kwargs):
    hp, ht, pid, tid = self._oldapi.CreateProcess(*args, **kwargs)
    win32job.AssignProcessToJobObject(self._job, hp)
    win32process.ResumeThread(ht)
    return hp, ht, pid, tid

    def __init__(self, *args, **kwargs):
    """Start a new process using an anonymous job object. Takes the same arguments as Popen"""

    # Create a new job object
    self._win32_job = self._create_job_object()
    @@ -37,13 +42,15 @@ def __init__(self, *args, **kwargs):
    kwargs["creationflags"] |= CREATE_SUSPENDED
    try:
    _winapi = subprocess._winapi # Python 3
    _winapi_key = "_winapi"
    except AttributeError:
    _winapi = subprocess._subprocess # Python 2
    _winapi_key = "_subprocess"
    try:
    subprocess._winapi = JobObjectPopen._winapijobhandler(_winapi, self._win32_job)
    setattr(subprocess, _winapi_key, JobObjectPopen._winapijobhandler(_winapi, self._win32_job))
    super(JobObjectPopen, self).__init__(*args, **kwargs)
    finally:
    subprocess._winapi = _winapi
    setattr(subprocess, _winapi_key, _winapi)

    def _create_job_object(self):
    """Create a new anonymous job object"""
    @@ -54,15 +61,20 @@ def _create_job_object(self):
    return hjob

    def _close_job_object(self, hjob):
    """Close the handle to a job object"""
    """Close the handle to a job object, terminating all processes inside it"""
    if self._win32_job:
    win32api.CloseHandle(self._win32_job)
    self._win32_job = None

    # This ensures that no remaining subprocesses are found when the process
    # exits from a `with JobObjectPopen(...)` block.
    def __exit__(self, exc_type, value, traceback):
    super(JobObjectPopen, self).__exit__(exc_type, value, traceback)
    self._close_job_object(self._win32_job)

    def __del__(self, *args, **kwargs):
    Popen.__del__(self, *args, **kwargs)
    # Python does not keep a reference outside of the parent class when the
    # interpreter exits, which is why we keep it here.
    _Popen = subprocess.Popen
    def __del__(self):
    self._Popen.__del__(self)
    self._close_job_object(self._win32_job)
  4. fxthomas revised this gist Feb 15, 2019. 1 changed file with 4 additions and 1 deletion.
    5 changes: 4 additions & 1 deletion job_popen.py
    Original file line number Diff line number Diff line change
    @@ -35,7 +35,10 @@ def __init__(self, *args, **kwargs):
    CREATE_SUSPENDED = 0x00000004
    kwargs.setdefault("creationflags", 0)
    kwargs["creationflags"] |= CREATE_SUSPENDED
    _winapi = subprocess._winapi
    try:
    _winapi = subprocess._winapi # Python 3
    except AttributeError:
    _winapi = subprocess._subprocess # Python 2
    try:
    subprocess._winapi = JobObjectPopen._winapijobhandler(_winapi, self._win32_job)
    super(JobObjectPopen, self).__init__(*args, **kwargs)
  5. fxthomas created this gist Feb 15, 2019.
    65 changes: 65 additions & 0 deletions job_popen.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,65 @@
    class JobObjectPopen(Popen):
    """Start a process in a new Win32 job object.
    This `subprocess.Popen` subclass takes the same arguments as Popen and
    behaves the same way. In addition to that, created processes will be
    assigned to a new anonymous Win32 job object on startup, which will
    guarantee that the processes will be terminated by the OS as soon as
    either the Popen object, job object handle or parent Python process are
    closed.
    """

    class _winapijobhandler(object):
    """Patches the subprocess._winapi module to automatically assign created threads in a job"""
    def __init__(self, oldapi, job):
    self._oldapi = oldapi
    self._job = job
    def __getattr__(self, key):
    if key != "CreateProcess":
    return getattr(self._oldapi, key)
    else:
    return self.CreateProcess
    def CreateProcess(self, *args, **kwargs):
    hp, ht, pid, tid = self._oldapi.CreateProcess(*args, **kwargs)
    win32job.AssignProcessToJobObject(self._job, hp)
    win32process.ResumeThread(ht)
    return hp, ht, pid, tid

    def __init__(self, *args, **kwargs):

    # Create a new job object
    self._win32_job = self._create_job_object()

    # Temporarily patch the subprocess creation logic to assign created
    # processes to the new job, then resume execution normally.
    CREATE_SUSPENDED = 0x00000004
    kwargs.setdefault("creationflags", 0)
    kwargs["creationflags"] |= CREATE_SUSPENDED
    _winapi = subprocess._winapi
    try:
    subprocess._winapi = JobObjectPopen._winapijobhandler(_winapi, self._win32_job)
    super(JobObjectPopen, self).__init__(*args, **kwargs)
    finally:
    subprocess._winapi = _winapi

    def _create_job_object(self):
    """Create a new anonymous job object"""
    hjob = win32job.CreateJobObject(None, "")
    extended_info = win32job.QueryInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation)
    extended_info['BasicLimitInformation']['LimitFlags'] = win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
    win32job.SetInformationJobObject(hjob, win32job.JobObjectExtendedLimitInformation, extended_info)
    return hjob

    def _close_job_object(self, hjob):
    """Close the handle to a job object"""
    if self._win32_job:
    win32api.CloseHandle(self._win32_job)
    self._win32_job = None

    def __exit__(self, exc_type, value, traceback):
    super(JobObjectPopen, self).__exit__(exc_type, value, traceback)
    self._close_job_object(self._win32_job)

    def __del__(self, *args, **kwargs):
    Popen.__del__(self, *args, **kwargs)
    self._close_job_object(self._win32_job)