from sqlalchemy import event from sqlalchemy import DDL def mysql_cidr_overlap(engine, metadata): @event.listens_for(metadata, "after_create") def _create_mysql_proc(target, connection, **kw): if connection.engine.name != 'mysql': return if connection.scalar( "SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES " "WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND " "ROUTINE_NAME=%s", ("cidr_overlap", ) ): connection.execute("DROP FUNCTION cidr_overlap") connection.execute( DDL(""" CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30)) RETURNS TINYINT BEGIN DECLARE bitmask INT; -- note - Mike is semi-guessing on the math here, needs tests! don't stick -- into production pls :) SET bitmask = pow( 2, (32 - least( cast(substring_index(cidr1, '/', -1) as integer), cast(substring_index(cidr2, '/', -1) as integer) )) ) - 1; RETURN inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask = inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask; END """) ) if __name__ == '__main__': from sqlalchemy import Column, Integer, String, create_engine, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session, aliased from sqlalchemy import event Base = declarative_base() class A(Base): __tablename__ = 'a' id = Column(Integer, primary_key=True) subnet = Column(String(30)) event.listen( A.__table__, "after_create", DDL(""" CREATE TRIGGER no_overlap_cidr_a BEFORE INSERT ON a FOR EACH ROW BEGIN DECLARE msg VARCHAR(200); IF (EXISTS(SELECT * FROM a WHERE cidr_overlap(subnet, NEW.subnet))) THEN SET msg = CONCAT( 'inserted subnet ', NEW.subnet, ' conflicts with existing subnets'); SIGNAL sqlstate '45000' SET MESSAGE_TEXT = msg; END IF; END """) ) e = create_engine("mysql://scott:tiger@localhost/test", echo=True) mysql_cidr_overlap(e, Base.metadata) Base.metadata.drop_all(e) Base.metadata.create_all(e) s = Session(e) with s.begin_nested(): s.add(A(subnet='192.168.1.0/24')) with s.begin_nested(): s.add(A(subnet='192.168.2.0/24')) try: with s.begin_nested(): s.add(A(subnet='192.168.2.0/25')) except Exception as e: print "Error! %s" % e s.commit() a1, a2 = aliased(A), aliased(A) # return all non-overlapping CIDR pairs for a, b in s.query(a1.subnet, a2.subnet).\ filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\ filter(a1.id > a2.id): print a, b