Skip to content

Instantly share code, notes, and snippets.

@ryankask
Forked from carljm/db.py
Last active August 29, 2015 14:02
Show Gist options
  • Select an option

  • Save ryankask/a07662002b738d60e98f to your computer and use it in GitHub Desktop.

Select an option

Save ryankask/a07662002b738d60e98f to your computer and use it in GitHub Desktop.
"""
SQLAlchemy, PostgreSQL, and autocommit
The goal:
- DB-level autocommit by default.
- No unnecessary transaction around reads.
- Explicit transactions (and nested transactions using savepoints) when desired.
The below seems to achieve that, though I'm not very good at SQLAlchemy so it might be terrible.
"""
from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session as BaseSession
class Session(BaseSession):
def __init__(self, *a, **kw):
super(Session, self).__init__(*a, **kw)
self._in_atomic = False
@contextmanager
def atomic(self):
"""Transaction context manager.
Will commit the transaction on successful completion of the block, or
roll it back on error.
Supports nested usage (via savepoints).
"""
nested = self._in_atomic
self.begin(nested=nested)
self._in_atomic = True
try:
yield
except:
self.rollback()
raise
else:
self.commit()
finally:
if not nested:
self._in_atomic = False
class Database(object):
def __init__(self, db_uri):
self.engine = create_engine(db_uri, isolation_level="AUTOCOMMIT")
self.Session = sessionmaker(bind=self.engine, class_=Session, autocommit=True)
# Keep track of which DBAPI connection(s) had autocommit turned off for
# a particular transaction object.
dconns_by_trans = {}
@event.listens_for(self.Parley, 'after_begin')
def receive_after_begin(session, transaction, connection):
"""When a (non-nested) transaction begins, turn autocommit off."""
dbapi_connection = connection.connection.connection
if transaction.nested:
assert not dbapi_connection.autocommit
return
assert dbapi_connection.autocommit
dbapi_connection.autocommit = False
dconns_by_trans.setdefault(transaction, set()).add(
dbapi_connection)
@event.listens_for(self.Parley, 'after_transaction_end')
def receive_after_transaction_end(session, transaction):
"""Restore autocommit anywhere this transaction turned it off."""
dconns = dconns_by_trans.get(transaction, set())
while dconns:
dbapi_connection = dconns.pop()
assert not dbapi_connection.autocommit
dbapi_connection.autocommit = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment