Created
January 25, 2013 15:30
-
-
Save selwin/4635260 to your computer and use it in GitHub Desktop.
Revisions
-
selwin created this gist
Jan 25, 2013 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,156 @@ from gevent import monkey monkey.patch_socket() import gevent.pool import os import signal import random import time import datetime from multiprocessing import Semaphore, Array _signames = dict((getattr(signal, signame), signame) \ for signame in dir(signal) \ if signame.startswith('SIG') and '_' not in signame) def signal_name(signum): # Hackety-hack-hack: is there really no better way to reverse lookup the # signal name? If you read this and know a way: please provide a patch :) try: return _signames[signum] except KeyError: return 'SIG_UNKNOWN' class BaseWorker(object): def __init__(self, num_processes=1): self._is_stopped = False def work(self): self._install_signal_handlers() while True: if self._is_stopped: break self.spawn_child() self.quit() def quit(self): """ Enters a loop to wait for all children to finish and then quit """ while True: if not self.has_active_horses(): break time.sleep(1) def spawn_child(self): raise NotImplementedError('Implement this in a subclass.') def has_active_horses(self): # Each worker class has to implement a way of checking whether it is # in the middle of running one or more jobs raise NotImplementedError('Implement this in a subclass.') def handle_quit(self): print 'QUITTT' def fake_work(self): # When working, children should ignore CTRL+C signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) sleep_time = 5 * random.random() print datetime.datetime.now(), '- Hello from', os.getpid(), '- %.3fs' % sleep_time time.sleep(sleep_time) print datetime.datetime.now(), '- Done from', os.getpid() def _install_signal_handlers(self): """Installs signal handlers for handling SIGINT and SIGTERM gracefully. """ def request_force_stop(signum, frame): """Terminates the application (cold shutdown). """ print 'Cold shut down.' # Need to call ``handle_cold_shutdown`` implemented by subclasses #self.handle_cold_shutdown() raise SystemExit() def request_stop(signum, frame): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). """ self._is_stopped = True print '%s Got signal %s.' % (os.getpid(), signal_name(signum)) signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) print 'Warm shut down requested.' signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop) def process_is_alive(pid): # Check if a process is alive by sending it a signal that does nothing # If OSError is raised, it means the process is no longer alive try: os.kill(pid, 0) return True except OSError: return False class ForkingWorker(BaseWorker): def __init__(self, num_processes=1): super(ForkingWorker, self).__init__() # Set up sync primitives, to communicate with the spawned children self._semaphore = Semaphore(num_processes) self._slots = Array('i', [0] * num_processes) def spawn_child(self): """Forks and executes the job.""" self._semaphore.acquire() # responsible for the blocking # If CTRL+C is pressed when blocking to acquire children # (causes self._stopped to be set to True) return right away if self._is_stopped: return # Select an empty slot from self._slots (the first 0 value is picked) # The implementation guarantees there will always be at least one empty slot for slot, value in enumerate(self._slots): if value == 0: break # The usual hardcore forking action child_pid = os.fork() if child_pid == 0: random.seed() # Within child try: self.fake_work() finally: # This is the new stuff. Remember, we're in the child process # currently. When all work is done here, free up the current # slot (by writing a 0 in the slot position). This # communicates to the parent that the current child has died # (so can safely be forgotten about). self._slots[slot] = 0 self._semaphore.release() os._exit(0) else: # Within parent, keep track of the new child by writing its PID # into the first free slot index. self._slots[slot] = child_pid def has_active_horses(self): # If any of the worker slot is non zero, that means there's a job still running for pid in self._slots: if pid and process_is_alive(pid): return True return False