Skip to content

Instantly share code, notes, and snippets.

@zzzeek
Last active August 12, 2020 19:24
Show Gist options
  • Select an option

  • Save zzzeek/a3bccad40610b9b69803531cc71a79b1 to your computer and use it in GitHub Desktop.

Select an option

Save zzzeek/a3bccad40610b9b69803531cc71a79b1 to your computer and use it in GitHub Desktop.

Revisions

  1. zzzeek revised this gist May 5, 2016. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -27,14 +27,14 @@ def _create_mysql_proc(target, connection, **kw):
    SET bitmask = pow(
    2,
    (32 - least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    cast(substring_index(cidr1, '/', -1) as integer),
    cast(substring_index(cidr2, '/', -1) as integer)
    ))
    ) - 1;
    RETURN
    inet_aton(substring_index(cidr1, "/", 1)) & ~bitmask =
    inet_aton(substring_index(cidr2, "/", 1)) & ~bitmask;
    inet_aton(substring_index(cidr1, '/', 1)) & ~bitmask =
    inet_aton(substring_index(cidr2, '/', 1)) & ~bitmask;
    END
    """)
    )
  2. zzzeek revised this gist Apr 18, 2016. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -22,6 +22,8 @@ def _create_mysql_proc(target, connection, **kw):
    RETURNS TINYINT
    BEGIN
    DECLARE bitmask INT;
    -- note - Mike is semi-guessing on the math here, needs tests! don't stick
    -- into production pls :)
    SET bitmask = pow(
    2,
    (32 - least(
  3. zzzeek revised this gist Apr 17, 2016. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -24,11 +24,11 @@ def _create_mysql_proc(target, connection, **kw):
    DECLARE bitmask INT;
    SET bitmask = pow(
    2,
    least(
    (32 - least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    );
    ))
    ) - 1;
    RETURN
    inet_aton(substring_index(cidr1, "/", 1)) & ~bitmask =
  4. zzzeek revised this gist Apr 15, 2016. 1 changed file with 12 additions and 25 deletions.
    37 changes: 12 additions & 25 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -20,33 +20,20 @@ def _create_mysql_proc(target, connection, **kw):
    DDL("""
    CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
    RETURNS TINYINT
    RETURN
    inet_aton(substring_index(cidr1, "/", 1))
    &
    ~(
    pow(
    2,
    least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    )
    )
    =
    inet_aton(substring_index(cidr2, "/", 1))
    &
    ~(
    pow(
    2,
    least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    BEGIN
    DECLARE bitmask INT;
    SET bitmask = pow(
    2,
    least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    )
    );
    RETURN
    inet_aton(substring_index(cidr1, "/", 1)) & ~bitmask =
    inet_aton(substring_index(cidr2, "/", 1)) & ~bitmask;
    END
    """)
    )

  5. zzzeek revised this gist Apr 15, 2016. 1 changed file with 97 additions and 31 deletions.
    128 changes: 97 additions & 31 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -1,47 +1,113 @@
    from sqlalchemy import *
    from sqlalchemy.orm import *
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import event
    from sqlalchemy import DDL

    import ipaddr # install from pip

    Base = declarative_base()
    def _mysql_cidr_overlap(metadata):
    @event.listens_for(metadata, "after_create")
    def _create_mysql_proc(target, connection, **kw):
    if connection.engine.name != 'mysql':
    return

    if connection.scalar(
    "SELECT ROUTINE_NAME FROM INFORMATION_SCHEMA.ROUTINES "
    "WHERE ROUTINE_TYPE='FUNCTION' AND ROUTINE_SCHEMA=DATABASE() AND "
    "ROUTINE_NAME=%s",
    ("cidr_overlap", )
    ):
    connection.execute("DROP FUNCTION cidr_overlap")

    class A(Base):
    __tablename__ = 'a'
    id = Column(Integer, primary_key=True)
    subnet = Column(String)
    connection.execute(
    DDL("""
    CREATE FUNCTION cidr_overlap (cidr1 VARCHAR(30), cidr2 VARCHAR(30))
    RETURNS TINYINT
    RETURN
    inet_aton(substring_index(cidr1, "/", 1))
    &
    ~(
    pow(
    2,
    least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    )
    )
    =
    e = create_engine("sqlite://", echo=True)
    inet_aton(substring_index(cidr2, "/", 1))
    &
    ~(
    pow(
    2,
    least(
    cast(substring_index(cidr1, "/", -1) as integer),
    cast(substring_index(cidr2, "/", -1) as integer)
    )
    )
    )
    def cidr_overlap(n1, n2):
    n1 = ipaddr.IPNetwork(n1)
    n2 = ipaddr.IPNetwork(n2)
    return n1.overlaps(n2)
    """)
    )


    @event.listens_for(e, "connect")
    def connect(dbapi_connection, connection_record):
    dbapi_connection.create_function("cidr_overlap", 2, cidr_overlap)
    def _sqlite_cidr_overlap(engine):
    import ipaddr

    def python_cidr_overlap(n1, n2):
    n1 = ipaddr.IPNetwork(n1)
    n2 = ipaddr.IPNetwork(n2)
    return n1.overlaps(n2)

    Base.metadata.create_all(e)
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
    if e.name == 'sqlite':
    dbapi_connection.create_function(
    "cidr_overlap", 2, python_cidr_overlap)

    s = Session(e)

    s.add_all([
    A(subnet='192.168.1.0/24'),
    A(subnet='192.168.2.0/24'),
    A(subnet='192.168.2.0/25')
    ])
    s.commit()
    def cidr_overlap(engine, metadata):
    if engine.name == 'mysql':
    _mysql_cidr_overlap(metadata)
    elif engine.name == 'sqlite':
    _sqlite_cidr_overlap(engine)

    a1, a2 = aliased(A), aliased(A)
    if __name__ == '__main__':
    from sqlalchemy import Column, Integer, String, create_engine, func
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import Session, aliased

    # return all non-overlapping CIDR pairs
    for a, b in s.query(a1.subnet, a2.subnet).\
    filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
    filter(a1.id > a2.id):
    print a, b
    Base = declarative_base()

    class A(Base):
    __tablename__ = 'a'
    id = Column(Integer, primary_key=True)
    subnet = Column(String(30))

    for url in [
    "mysql://scott:tiger@localhost/test",
    "sqlite://"
    ]:
    e = create_engine(url, echo=True)

    cidr_overlap(e, Base.metadata)

    Base.metadata.drop_all(e)
    Base.metadata.create_all(e)

    s = Session(e)

    s.add_all([
    A(subnet='192.168.1.0/24'),
    A(subnet='192.168.2.0/24'),
    A(subnet='192.168.2.0/25')
    ])
    s.commit()

    a1, a2 = aliased(A), aliased(A)

    # return all non-overlapping CIDR pairs
    for a, b in s.query(a1.subnet, a2.subnet).\
    filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
    filter(a1.id > a2.id):
    print a, b
  6. zzzeek revised this gist Apr 15, 2016. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -1,9 +1,10 @@
    from sqlalchemy import *
    from sqlalchemy.orm import *
    from sqlalchemy.ext.declarative import declarative_base
    import ipaddr
    from sqlalchemy import event

    import ipaddr # install from pip

    Base = declarative_base()


  7. zzzeek renamed this gist Apr 15, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  8. zzzeek created this gist Apr 15, 2016.
    46 changes: 46 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,46 @@
    from sqlalchemy import *
    from sqlalchemy.orm import *
    from sqlalchemy.ext.declarative import declarative_base
    import ipaddr
    from sqlalchemy import event

    Base = declarative_base()


    class A(Base):
    __tablename__ = 'a'
    id = Column(Integer, primary_key=True)
    subnet = Column(String)

    e = create_engine("sqlite://", echo=True)


    def cidr_overlap(n1, n2):
    n1 = ipaddr.IPNetwork(n1)
    n2 = ipaddr.IPNetwork(n2)
    return n1.overlaps(n2)


    @event.listens_for(e, "connect")
    def connect(dbapi_connection, connection_record):
    dbapi_connection.create_function("cidr_overlap", 2, cidr_overlap)


    Base.metadata.create_all(e)

    s = Session(e)

    s.add_all([
    A(subnet='192.168.1.0/24'),
    A(subnet='192.168.2.0/24'),
    A(subnet='192.168.2.0/25')
    ])
    s.commit()

    a1, a2 = aliased(A), aliased(A)

    # return all non-overlapping CIDR pairs
    for a, b in s.query(a1.subnet, a2.subnet).\
    filter(~func.cidr_overlap(a1.subnet, a2.subnet)).\
    filter(a1.id > a2.id):
    print a, b