Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save sandscap-sc/3334cfbe6b287da65a5dfdac07efe6e4 to your computer and use it in GitHub Desktop.

Select an option

Save sandscap-sc/3334cfbe6b287da65a5dfdac07efe6e4 to your computer and use it in GitHub Desktop.

Revisions

  1. @kulmam92 kulmam92 created this gist Oct 5, 2019.
    298 changes: 298 additions & 0 deletions script_out_snowflake_database_objects.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,298 @@
    import os
    import shutil
    import subprocess
    import time
    from datetime import datetime
    import snowflake.connector
    import logging

    # Information for Snowflake
    SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure"
    SNOWSQL_USER = "XXXXXXXX"
    SNOWSQL_PASSWORD = "XXXXXXX"
    SNOWSQL_WAREHOUSE = "DEMO_WH"
    SNOWSQL_SCHEMA = "PUBLIC"
    SNOWSQL_ROLE = "SYSADMIN"

    # Parameter
    SNOWSQL_DATABASE = "DEMO_DB"
    BASE_PATH = "/dev/snowflake"

    OBJECT_LIST_QUERY ="""\
    SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
    FROM (
    select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE
    , 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
    'DATABASE ' || D.DATABASE_NAME || ' ' ||
    '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
    CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script
    from INFORMATION_SCHEMA.DATABASES D
    WHERE DATABASE_NAME = '{}'
    UNION ALL
    select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE
    , 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
    'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
    CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
    '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
    CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script
    from INFORMATION_SCHEMA.SCHEMATA S
    UNION ALL
    select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE != 'VIEW'
    UNION ALL
    select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.VIEWS
    WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA'
    UNION ALL
    select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.SEQUENCES
    UNION ALL
    select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.FILE_FORMATS
    UNION ALL
    select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.PIPES
    UNION ALL
    select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script
    from INFORMATION_SCHEMA.FUNCTIONS
    UNION ALL
    select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script
    from "INFORMATION_SCHEMA"."PROCEDURES"
    ) T
    ORDER BY seq, catalog_name, schema_name, object_name"""

    DDL_DATABASE_QUERY = """\
    SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
    'DATABASE ' || D.DATABASE_NAME || ' ' ||
    '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
    CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';'
    FROM INFORMATION_SCHEMA.DATABASES D
    WHERE D.DATABASE_NAME = '{}'"""

    DDL_SCHEMA_QUERY = """\
    SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
    'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
    CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
    '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
    CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';'
    FROM INFORMATION_SCHEMA.SCHEMATA S"""

    GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script"

    # I wasn't able to find a good way to distinguish role and share from the GRANTEE column
    # Therefore, I assumed that share will have postfix "_SHARE"
    OBJECT_PERMISSION_QUERY = """\
    SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
    CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
    CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
    ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
    ' ' || A.GRANTEE || ';' GRANTED
    FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
    WHERE A.OBJECT_SCHEMA = '{}'
    AND A.OBJECT_NAME = '{}'"""

    ALL_PERMISSION_QUERY = """\
    SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
    CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME,
    REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
    CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
    CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
    ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
    ' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED
    FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
    GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
    CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME"""

    GET_DDL_WITH_PERMISSION_QUERY = """\
    SELECT GET_DDL('{}','{}') script
    UNION ALL
    SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
    CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
    CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
    ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
    ' ' || A.GRANTEE || ';' GRANTED
    FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
    WHERE 'True' = '{}'
    AND A.OBJECT_SCHEMA = '{}'
    AND A.OBJECT_NAME = '{}'"""

    # STREAM - SHOW - GET_DDL
    STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}"

    # TASK - SHOW
    TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}"
    """ 'CREATE OR REPLACE TASK ' || schema_name || '.' || name ||
    '/n WAREHOUSE = ' || wareshouse ||
    CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END ||
    CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END ||
    -- Can't find column for the session_parameters
    CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';'
    CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END ||
    '/nAS' ||
    '/n' || definition || ';'
    """

    logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

    def ScriptOut(database, base_path, includePermission=True):
    now = datetime.now()

    ctx = snowflake.connector.connect(
    account=SNOWSQL_ACCOUNT,
    warehouse=SNOWSQL_WAREHOUSE,
    database=database,
    user=SNOWSQL_USER,
    password=SNOWSQL_PASSWORD,
    schema=SNOWSQL_SCHEMA,
    role=SNOWSQL_ROLE
    )
    cs = ctx.cursor()
    logging.info("Connected to Snowflake")

    try:
    # Change database
    cs.execute("USE database {}".format(database))

    # Get permission at once
    if includePermission:
    logging.info("Get permission at once")
    permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall()
    perms = {}
    for rec in permResults:
    perms[rec[0]] = rec[1]

    # Get object list using information_schema
    results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall()
    ddlStmt = ""
    for rec in results:
    #seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
    logging.info('processing {}, {}'.format(rec[3], rec[4]))
    if rec[3] == 'DATABASE':
    outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql')
    ddlStmt = rec[6]
    if includePermission:
    logging.info("get permission")
    permStmt = perms.get(rec[4])
    if permStmt:
    logging.debug(permStmt)
    ddlStmt += '\n\n' + permStmt
    elif rec[3] == 'SCHEMA':
    outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql')
    ddlStmt = rec[6]
    # if includePermission:
    # logging.info("get permission")
    # rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name
    # if rslt is not None:
    # ddlStmt += '\n\n' + rslt[0]
    if includePermission:
    logging.info("get permission")
    permStmt = perms.get(rec[1] + '.' + rec[4])
    if permStmt:
    logging.debug(permStmt)
    ddlStmt += '\n\n' + permStmt
    else:
    if rec[3] in ('FUNCTION', 'PROCEDURE'):
    #need to provide ARGUMENT_SIGNATURE only with data_type
    argumentSignature = str(rec[5])
    if argumentSignature != "()":
    args = argumentSignature.replace("(","").replace(")","").split(",")
    #print(args)
    newargs = []
    for arg in args:
    newargs.append(arg.strip().split()[1])
    argumentSignature = '(' + ', '.join(newargs) + ')'
    # object_type, schema_name.object_name(ARGUMENT_SIGNATURE)
    objectType = rec[3]
    objectName = rec[2] + '.' + rec[4] + argumentSignature
    outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql')
    else:
    # object_type, schema_name.object_name
    objectType = rec[3]
    objectName = rec[2] + '.' + rec[4]
    outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql')

    rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
    ddlStmt = rslt[0]
    if includePermission:
    logging.info("get permission")
    permStmt = perms.get(rec[1] + '.' + objectName)
    if permStmt:
    logging.debug(permStmt)
    ddlStmt += '\n\n' + permStmt
    # Write it to a file
    logging.info("write to a file: {}".format(outFile))
    os.makedirs(os.path.dirname(outFile), exist_ok=True)
    logging.info("directory: {}".format(os.path.dirname(outFile)))
    with open(outFile, "w") as f:
    f.write(ddlStmt)

    # STREAM - SHOW - GET_DDL
    # created_on, name, database_name, schema_name
    logging.info("stream")
    stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall()
    for rec in stremResults:
    objectType = 'STREAM'
    objectName = rec[3] + '.' + rec[1]
    outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql')
    rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
    ddlStmt = rslt[0]
    if includePermission:
    logging.info("get permission")
    permStmt = perms.get(rec[2] + '.' + objectName)
    if permStmt:
    logging.debug(permStmt)
    ddlStmt += '\n\n' + permStmt
    logging.info("write to a file")
    os.makedirs(os.path.dirname(outFile), exist_ok=True)
    with open(outFile, "w") as f:
    f.write(ddlStmt)

    # TASK - SHOW
    # created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition
    logging.info("taskResults")
    taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall()
    for rec in taskResults:
    logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name
    objectType = 'TASK'
    objectName = rec[3] + '.' + rec[1]
    ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6]
    if rec[7] is not None: #schedule
    ddlStmt += "\n SCHEDULE = '" + rec[7] + "'"
    if rec[8] is not None: #predecessor
    ddlStmt += "\n AFTER " + rec[8]
    if rec[5] is not None: #comment
    ddlStmt += "\n COMMENT = '" + rec[5] + "'"
    if rec[11] is not None: #condition
    ddlStmt += "\n WHEN " + rec[11]
    ddlStmt += '\nAS\n' + rec[10] + ';' #definition
    outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql')
    if includePermission:
    logging.info("get permission")
    permStmt = perms.get(rec[2] + '.' + objectName)
    if permStmt:
    logging.debug(permStmt)
    ddlStmt += '\n\n' + permStmt
    logging.info("write to a file")
    os.makedirs(os.path.dirname(outFile), exist_ok=True)
    with open(outFile, "w") as f:
    f.write(ddlStmt)

    except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    logging.error(e)
    finally:
    cs.close()
    ctx.close()

    def main():
    start_time = time.time()
    ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True)
    elapsed_time = time.time() - start_time
    logging.info("Completed after: {}".format(str(elapsed_time)))

    if __name__ == '__main__':
    main()