import fcntl import time import os import sys import shutil import errno import xapian as _x # list of children to wait on _pids = [] # database directory _DB = "./test.db" # number of forks _FORKS = 10 # Make sure our test is clean try: shutil.rmtree(_DB) except OSError, e: if e.errno != errno.ENOENT: raise e os.mkdir(_DB) def _safe_db_do(db, attr, *args, **kwargs): '''As the database may be modified at any time, we need to reopen, and trap DatabaseModifiedError for each operation, and retry until success''' while True: db.reopen() try: return getattr(db, attr)(*args, **kwargs) except _x.DatabaseModifiedError: pass def _fork_term(fork_num): '''Wrapper to create our fork-specific term''' return "".join([ "XHELO", str(fork_num), ]) def _writable(): '''Blocking wait on lock, then yield the writable DB''' # open the lock-file, in the DB dir to flock against lockfd = os.open(os.path.join(_DB, ".writelock"), os.O_CREAT, 0666) try: fcntl.flock(lockfd, fcntl.LOCK_EX) return lockfd, _x.WritableDatabase(_DB, _x.DB_CREATE_OR_OPEN) except Exception, e: os.close(lockfd) raise e for i in range(_FORKS): pid = os.fork() # child fork if 0 == pid: lockfd, writable = _writable() try: print "lock acquired by: %s" % os.getpid() readable = _x.Database(_DB) try: # sleep here only to exagerate the serialization time.sleep(0.5) doc = _x.Document() term = _fork_term(i+2) doc.add_term(term) # add something new, and modify something contended writable.replace_document(i+2, doc) writable.replace_document(1, doc) # you must commit, or your changes may not be picked up by # other forks, when many are reading writable.commit() writable.close() for t in _safe_db_do(readable, 'get_document', i+2).termlist(): print "new: %s; should be: %s" % (t.term, term) for t in _safe_db_do(readable, 'get_document', 1).termlist(): print "contested: %s; should be: %s" % (t.term, term) finally: readable.close() finally: os.close(lockfd) writable.close() # cleanup -- Exit with the i number, to determine (cheaply) which # fork exited last os._exit(i) else: _pids.append(pid) # wait for the forks to cycle out last_exit = 0 while len(_pids): pid, last_exit = os.waitpid(-1, 0) if not os.WIFEXITED(last_exit): print >> sys.stderr, "abnormal termination of fork: %s" % pid _pids.remove(pid) last_exit = os.WEXITSTATUS(last_exit) # demonstrate the contested value matches the last exiting fork readable = _x.Database(_DB) for i in range(_FORKS): for t in _safe_db_do(readable, 'get_document', i+2).termlist(): print "uncontested: %s; should be: %s" % (t.term, _fork_term(i+2)) for t in _safe_db_do(readable, 'get_document', 1).termlist(): print "contested: %s; should be: %s" % (t.term, _fork_term(last_exit+2)) sys.exit(0)