Skip to content

Instantly share code, notes, and snippets.

@selwin
Created January 25, 2013 15:30
Show Gist options
  • Save selwin/4635260 to your computer and use it in GitHub Desktop.
Save selwin/4635260 to your computer and use it in GitHub Desktop.

Revisions

  1. selwin created this gist Jan 25, 2013.
    156 changes: 156 additions & 0 deletions new_workers.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,156 @@
    from gevent import monkey
    monkey.patch_socket()
    import gevent.pool

    import os
    import signal
    import random
    import time
    import datetime
    from multiprocessing import Semaphore, Array



    _signames = dict((getattr(signal, signame), signame) \
    for signame in dir(signal) \
    if signame.startswith('SIG') and '_' not in signame)


    def signal_name(signum):
    # Hackety-hack-hack: is there really no better way to reverse lookup the
    # signal name? If you read this and know a way: please provide a patch :)
    try:
    return _signames[signum]
    except KeyError:
    return 'SIG_UNKNOWN'


    class BaseWorker(object):

    def __init__(self, num_processes=1):
    self._is_stopped = False

    def work(self):
    self._install_signal_handlers()
    while True:
    if self._is_stopped:
    break
    self.spawn_child()
    self.quit()

    def quit(self):
    """ Enters a loop to wait for all children to finish and then quit """
    while True:
    if not self.has_active_horses():
    break
    time.sleep(1)

    def spawn_child(self):
    raise NotImplementedError('Implement this in a subclass.')

    def has_active_horses(self):
    # Each worker class has to implement a way of checking whether it is
    # in the middle of running one or more jobs
    raise NotImplementedError('Implement this in a subclass.')

    def handle_quit(self):
    print 'QUITTT'

    def fake_work(self):
    # When working, children should ignore CTRL+C
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)

    sleep_time = 5 * random.random()
    print datetime.datetime.now(), '- Hello from', os.getpid(), '- %.3fs' % sleep_time
    time.sleep(sleep_time)
    print datetime.datetime.now(), '- Done from', os.getpid()

    def _install_signal_handlers(self):
    """Installs signal handlers for handling SIGINT and SIGTERM
    gracefully.
    """

    def request_force_stop(signum, frame):
    """Terminates the application (cold shutdown).
    """
    print 'Cold shut down.'
    # Need to call ``handle_cold_shutdown`` implemented by subclasses
    #self.handle_cold_shutdown()
    raise SystemExit()

    def request_stop(signum, frame):
    """Stops the current worker loop but waits for child processes to
    end gracefully (warm shutdown).
    """
    self._is_stopped = True
    print '%s Got signal %s.' % (os.getpid(), signal_name(signum))
    signal.signal(signal.SIGINT, request_force_stop)
    signal.signal(signal.SIGTERM, request_force_stop)
    print 'Warm shut down requested.'

    signal.signal(signal.SIGINT, request_stop)
    signal.signal(signal.SIGTERM, request_stop)


    def process_is_alive(pid):
    # Check if a process is alive by sending it a signal that does nothing
    # If OSError is raised, it means the process is no longer alive
    try:
    os.kill(pid, 0)
    return True
    except OSError:
    return False


    class ForkingWorker(BaseWorker):

    def __init__(self, num_processes=1):
    super(ForkingWorker, self).__init__()
    # Set up sync primitives, to communicate with the spawned children
    self._semaphore = Semaphore(num_processes)
    self._slots = Array('i', [0] * num_processes)

    def spawn_child(self):
    """Forks and executes the job."""

    self._semaphore.acquire() # responsible for the blocking
    # If CTRL+C is pressed when blocking to acquire children
    # (causes self._stopped to be set to True) return right away
    if self._is_stopped:
    return


    # Select an empty slot from self._slots (the first 0 value is picked)
    # The implementation guarantees there will always be at least one empty slot
    for slot, value in enumerate(self._slots):
    if value == 0:
    break

    # The usual hardcore forking action
    child_pid = os.fork()
    if child_pid == 0:
    random.seed()
    # Within child
    try:
    self.fake_work()
    finally:
    # This is the new stuff. Remember, we're in the child process
    # currently. When all work is done here, free up the current
    # slot (by writing a 0 in the slot position). This
    # communicates to the parent that the current child has died
    # (so can safely be forgotten about).
    self._slots[slot] = 0
    self._semaphore.release()
    os._exit(0)
    else:
    # Within parent, keep track of the new child by writing its PID
    # into the first free slot index.
    self._slots[slot] = child_pid

    def has_active_horses(self):
    # If any of the worker slot is non zero, that means there's a job still running
    for pid in self._slots:
    if pid and process_is_alive(pid):
    return True
    return False