Forked from kulmam92/script_out_snowflake_database_objects.py
Created
November 11, 2022 15:22
-
-
Save sandscap-sc/3334cfbe6b287da65a5dfdac07efe6e4 to your computer and use it in GitHub Desktop.
Revisions
-
kulmam92 created this gist
Oct 5, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,298 @@ import os import shutil import subprocess import time from datetime import datetime import snowflake.connector import logging # Information for Snowflake SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure" SNOWSQL_USER = "XXXXXXXX" SNOWSQL_PASSWORD = "XXXXXXX" SNOWSQL_WAREHOUSE = "DEMO_WH" SNOWSQL_SCHEMA = "PUBLIC" SNOWSQL_ROLE = "SYSADMIN" # Parameter SNOWSQL_DATABASE = "DEMO_DB" BASE_PATH = "/dev/snowflake" OBJECT_LIST_QUERY ="""\ SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script FROM ( select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE , 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'DATABASE ' || D.DATABASE_NAME || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script from INFORMATION_SCHEMA.DATABASES D WHERE DATABASE_NAME = '{}' UNION ALL select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE , 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'SCHEMA ' || S.SCHEMA_NAME || ' ' || CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script from INFORMATION_SCHEMA.SCHEMATA S UNION ALL select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE != 'VIEW' UNION ALL select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.VIEWS WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA' UNION ALL select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.SEQUENCES UNION ALL select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.FILE_FORMATS UNION ALL select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.PIPES UNION ALL select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.FUNCTIONS UNION ALL select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script from "INFORMATION_SCHEMA"."PROCEDURES" ) T ORDER BY seq, catalog_name, schema_name, object_name""" DDL_DATABASE_QUERY = """\ SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'DATABASE ' || D.DATABASE_NAME || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' FROM INFORMATION_SCHEMA.DATABASES D WHERE D.DATABASE_NAME = '{}'""" DDL_SCHEMA_QUERY = """\ SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'SCHEMA ' || S.SCHEMA_NAME || ' ' || CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' FROM INFORMATION_SCHEMA.SCHEMATA S""" GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script" # I wasn't able to find a good way to distinguish role and share from the GRANTEE column # Therefore, I assumed that share will have postfix "_SHARE" OBJECT_PERMISSION_QUERY = """\ SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';' GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A WHERE A.OBJECT_SCHEMA = '{}' AND A.OBJECT_NAME = '{}'""" ALL_PERMISSION_QUERY = """\ SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME, REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME""" GET_DDL_WITH_PERMISSION_QUERY = """\ SELECT GET_DDL('{}','{}') script UNION ALL SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';' GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A WHERE 'True' = '{}' AND A.OBJECT_SCHEMA = '{}' AND A.OBJECT_NAME = '{}'""" # STREAM - SHOW - GET_DDL STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}" # TASK - SHOW TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}" """ 'CREATE OR REPLACE TASK ' || schema_name || '.' || name || '/n WAREHOUSE = ' || wareshouse || CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END || CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END || -- Can't find column for the session_parameters CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';' CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END || '/nAS' || '/n' || definition || ';' """ logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') def ScriptOut(database, base_path, includePermission=True): now = datetime.now() ctx = snowflake.connector.connect( account=SNOWSQL_ACCOUNT, warehouse=SNOWSQL_WAREHOUSE, database=database, user=SNOWSQL_USER, password=SNOWSQL_PASSWORD, schema=SNOWSQL_SCHEMA, role=SNOWSQL_ROLE ) cs = ctx.cursor() logging.info("Connected to Snowflake") try: # Change database cs.execute("USE database {}".format(database)) # Get permission at once if includePermission: logging.info("Get permission at once") permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall() perms = {} for rec in permResults: perms[rec[0]] = rec[1] # Get object list using information_schema results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall() ddlStmt = "" for rec in results: #seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script logging.info('processing {}, {}'.format(rec[3], rec[4])) if rec[3] == 'DATABASE': outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql') ddlStmt = rec[6] if includePermission: logging.info("get permission") permStmt = perms.get(rec[4]) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt elif rec[3] == 'SCHEMA': outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql') ddlStmt = rec[6] # if includePermission: # logging.info("get permission") # rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name # if rslt is not None: # ddlStmt += '\n\n' + rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[1] + '.' + rec[4]) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt else: if rec[3] in ('FUNCTION', 'PROCEDURE'): #need to provide ARGUMENT_SIGNATURE only with data_type argumentSignature = str(rec[5]) if argumentSignature != "()": args = argumentSignature.replace("(","").replace(")","").split(",") #print(args) newargs = [] for arg in args: newargs.append(arg.strip().split()[1]) argumentSignature = '(' + ', '.join(newargs) + ')' # object_type, schema_name.object_name(ARGUMENT_SIGNATURE) objectType = rec[3] objectName = rec[2] + '.' + rec[4] + argumentSignature outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql') else: # object_type, schema_name.object_name objectType = rec[3] objectName = rec[2] + '.' + rec[4] outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql') rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() ddlStmt = rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[1] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt # Write it to a file logging.info("write to a file: {}".format(outFile)) os.makedirs(os.path.dirname(outFile), exist_ok=True) logging.info("directory: {}".format(os.path.dirname(outFile))) with open(outFile, "w") as f: f.write(ddlStmt) # STREAM - SHOW - GET_DDL # created_on, name, database_name, schema_name logging.info("stream") stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall() for rec in stremResults: objectType = 'STREAM' objectName = rec[3] + '.' + rec[1] outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql') rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() ddlStmt = rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[2] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt logging.info("write to a file") os.makedirs(os.path.dirname(outFile), exist_ok=True) with open(outFile, "w") as f: f.write(ddlStmt) # TASK - SHOW # created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition logging.info("taskResults") taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall() for rec in taskResults: logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name objectType = 'TASK' objectName = rec[3] + '.' + rec[1] ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6] if rec[7] is not None: #schedule ddlStmt += "\n SCHEDULE = '" + rec[7] + "'" if rec[8] is not None: #predecessor ddlStmt += "\n AFTER " + rec[8] if rec[5] is not None: #comment ddlStmt += "\n COMMENT = '" + rec[5] + "'" if rec[11] is not None: #condition ddlStmt += "\n WHEN " + rec[11] ddlStmt += '\nAS\n' + rec[10] + ';' #definition outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql') if includePermission: logging.info("get permission") permStmt = perms.get(rec[2] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt logging.info("write to a file") os.makedirs(os.path.dirname(outFile), exist_ok=True) with open(outFile, "w") as f: f.write(ddlStmt) except snowflake.connector.errors.ProgrammingError as e: # default error message logging.error(e) finally: cs.close() ctx.close() def main(): start_time = time.time() ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True) elapsed_time = time.time() - start_time logging.info("Completed after: {}".format(str(elapsed_time))) if __name__ == '__main__': main()