import psycopg2 import psycopg2.extras import psycopg2.extensions import flask class MockConnectionBase(object): def __init__(self): # prevent connection setup self.registered_types = [] pass def cursor(self, cursor_factory=None): assert cursor_factory is None or \ cursor_factory == psycopg2.extras.RealDictCursor if cursor_factory is None: return "stubbed cursor" else: return "stubbed dict cursor" class ConnectionRebaser(type): def __new__(mcs, name, bases, dict): bases += (MockConnectionBase, ) return type.__new__(mcs, name, bases, dict) def mro(cls): return (cls, utils.PostgreSQLConnection, MockConnectionBase) + \ utils.PostgreSQLConnection.__mro__[1:] class RebasedPostgreSQLConnection(utils.PostgreSQLConnection): __metaclass__ = ConnectionRebaser class FakeExtensions(object): UNICODE = psycopg2.extensions.UNICODE UNICODEARRAY = psycopg2.extensions.UNICODEARRAY @classmethod def register_type(self, what, connection): connection.registered_types.append(what) class TestPostgreSQLConnection(object): def setup(self): assert psycopg2.extensions.__name__ == "psycopg2.extensions" self.fakes = FakeExtensions() self.original_extensions = psycopg2.extensions psycopg2.extensions = self.fakes def teardown(self): assert isinstance(psycopg2.extensions, FakeExtensions) psycopg2.extensions = self.original_extensions def test_only_affects_cursor(self): assert [x for x in utils.PostgreSQLConnection.__dict__ if not x.startswith("__")] == ["cursor"] def test_cursor(self): c = RebasedPostgreSQLConnection() assert c.cursor() == "stubbed cursor" assert c.cursor(False) == "stubbed cursor" assert c.cursor(True) == "stubbed dict cursor" def test_register_types(self): c = RebasedPostgreSQLConnection() assert c.registered_types == [psycopg2.extensions.UNICODE, psycopg2.extensions.UNICODEARRAY] class FakePsycopg2(object): class connection(object): class _cursor(object): def __init__(self, connection, real_dict_cursor): self.queries = [] self.connection = connection self.real_dict_cursor = real_dict_cursor def __enter__(self): return self def __exit__(self, *args): pass def execute(self, query, args=None): if self.connection.calls["close"] != 0: raise psycopg2.OperationalError self.queries.append((query, args)) close_error = False autocommit = False def __init__(self, **settings): self.settings = settings self.types = [] self.cursors = [] self.calls = {"commit": 0, "reset": 0, "close": 0} def cursor(self, real_dict_cursor=False): c = self._cursor(self, real_dict_cursor) self.cursors.append(c) return c def commit(self): assert self.calls["close"] == 0 self.calls["commit"] += 1 def reset(self): self.calls["reset"] += 1 if self.calls["close"] != 0: raise psycopg2.OperationalError def close(self): self.calls["close"] += 1 if self.close_error: raise psycopg2.OperationalError connections = 0 def connect(self, **settings): self.connections += 1 return self.connection(**settings) OperationalError = psycopg2.OperationalError extras = psycopg2.extras class TestPostgreSQL(object): def setup(self): assert utils.psycopg2 is psycopg2 self.fakes = utils.psycopg2 = FakePsycopg2() self.app = flask.Flask(__name__) self.app.config["POSTGRES"] = {"database": "mydb", "user": "steve"} self.postgres = utils.PostgreSQL(self.app) def teardown(self): assert isinstance(utils.psycopg2, FakePsycopg2) utils.psycopg2 = psycopg2 def test_adds_hooks(self): assert self.app.after_request_funcs == {None: [self.postgres.commit]} assert self.app.teardown_appcontext_funcs == [self.postgres.teardown] def test_connect_new(self): with self.app.test_request_context("/"): c = self.postgres.connection assert isinstance(c, self.fakes.connection) assert c.settings == \ {"database": "mydb", "user": "steve", "connection_factory": utils.PostgreSQLConnection} assert c.calls == {"commit": 0, "reset": 0, "close": 0} assert c.autocommit is False def test_connect_once(self): with self.app.test_request_context("/"): c = self.postgres.connection d = self.postgres.connection assert c is d assert self.fakes.connections == 1 assert c.calls == {"commit": 0, "reset": 0, "close": 0} def test_teardown_resets_before_store(self): with self.app.test_request_context("/"): c = self.postgres.connection assert c.calls == {"commit": 0, "reset": 1, "close": 0} def test_connect_from_pool(self): with self.app.test_request_context("/"): c = self.postgres.connection assert c.calls == {"commit": 0, "reset": 1, "close": 0} with self.app.test_request_context("/"): d = self.postgres.connection assert d is c assert d.calls == {"commit": 0, "reset": 2, "close": 0} assert d.calls == {"commit": 0, "reset": 3, "close": 0} def test_removes_from_pool(self): # put a connection in the pool with self.app.test_request_context("/"): c = self.postgres.connection # now get two connections from the pool # must explicitly create two app contexts. # in normal usage, flask promises to never share an app context # between requests. When testing, it will only create an app context # when test_request_context is __enter__'d and there is no existing # app context with self.app.app_context(), self.app.test_request_context("/1"): d = self.postgres.connection with self.app.app_context(), self.app.test_request_context("/2"): e = self.postgres.connection assert d is c assert e is not d assert d.calls == {"commit": 0, "reset": 3, "close": 0} assert e.calls == {"commit": 0, "reset": 1, "close": 0} def test_connect_from_pool_bad(self): # put two distinct connections in the pool with self.app.app_context(), self.app.test_request_context("/1"): c = self.postgres.connection with self.app.app_context(), self.app.test_request_context("/2"): d = self.postgres.connection assert c is not d assert c.calls == d.calls == {"commit": 0, "reset": 1, "close": 0} c.close() with self.app.test_request_context("/"): e = self.postgres.connection # it should try c.reset, which will fail, and then destroy the # pool by closing d as well # one close call by uut, one close call from above assert c.calls == {"commit": 0, "reset": 2, "close": 1 + 1} assert d.calls == {"commit": 0, "reset": 1, "close": 1} # e should be a new connection assert e is not c and e is not d assert e.calls == {"commit": 0, "reset": 0, "close": 0} assert e.calls == {"commit": 0, "reset": 1, "close": 0} def test_absorbs_close_errors(self): with self.app.app_context(), self.app.test_request_context("/1"): c = self.postgres.connection with self.app.app_context(), self.app.test_request_context("/2"): d = self.postgres.connection c.close() d.close_error = True with self.app.test_request_context("/"): e = self.postgres.connection def test_teardown_closes_if_pool_full(self): # default pool size is 2 with self.app.app_context(), self.app.test_request_context("/1"): c = self.postgres.connection with self.app.app_context(), self.app.test_request_context("/2"): d = self.postgres.connection with self.app.app_context(), \ self.app.test_request_context("/3"): e = self.postgres.connection assert len(set([c, d, e])) == 3 assert c.calls == {"commit": 0, "reset": 0, "close": 1} assert d.calls == {"commit": 0, "reset": 1, "close": 0} assert e.calls == {"commit": 0, "reset": 1, "close": 0} with self.app.app_context(), self.app.test_request_context("/1"): f = self.postgres.connection with self.app.app_context(), self.app.test_request_context("/2"): g = self.postgres.connection with self.app.app_context(), \ self.app.test_request_context("/3"): h = self.postgres.connection assert f is d assert g is e assert len(set([d, e, h])) == 3 def test_cursor(self): with self.app.test_request_context("/"): c = self.postgres.connection x = self.postgres.cursor() assert isinstance(x, c._cursor) assert x.connection is c assert len(c.cursors) == 1 y = self.postgres.cursor() assert y.connection is c assert len(c.cursors) == 2 assert c.calls == {"commit": 0, "reset": 1, "close": 0} with self.app.test_request_context("/"): # cursor first without asking for connection explicitly x = self.postgres.cursor() assert isinstance(x, utils.psycopg2.connection._cursor) c = x.connection assert isinstance(c, utils.psycopg2.connection) assert c.calls == {"commit": 0, "reset": 2, "close": 0} assert c.settings == \ {"database": "mydb", "user": "steve", "connection_factory": utils.PostgreSQLConnection} assert c is self.postgres.connection def test_dict_cursor(self): with self.app.test_request_context("/"): c = self.postgres.cursor(True) assert len(self.postgres.connection.cursors) == 1 assert c.real_dict_cursor c = self.postgres.cursor() assert not c.real_dict_cursor def test_commit(self): with self.app.test_request_context("/"): c = self.postgres.connection self.postgres.commit() assert c.calls == {"commit": 1, "reset": 0, "close": 0} def test_commit_as_hook(self): # as an after request hook, commit must return the response object # it is passed response = object() with self.app.test_request_context("/"): c = self.postgres.connection assert self.postgres.commit(response) is response # now check it works as a hook with self.app.test_request_context("/"): d = self.postgres.connection assert d is c assert self.app.process_response(response) is response assert c.calls == {"commit": 2, "reset": 3, "close": 0} def test_commit_nop_if_no_connection(self): with self.app.test_request_context("/"): self.postgres.commit() assert utils.psycopg2.connections == 0 with self.app.test_request_context("/"): self.app.process_response(None) assert utils.psycopg2.connections == 0 # should nop if teardown puts the connection in the pool with self.app.test_request_context("/"): c = self.postgres.connection self.postgres.teardown(None) self.postgres.commit() assert c.calls == {"commit": 0, "reset": 1, "close": 0} def test_teardown_nop_if_no_connection(self): with self.app.test_request_context("/"): self.postgres.teardown(None) assert utils.psycopg2.connections == 0 with self.app.test_request_context("/"): c = self.postgres.connection self.postgres.teardown(None) assert c.calls == {"commit": 0, "reset": 1, "close": 0} assert c.calls == {"commit": 0, "reset": 1, "close": 0}