import os import shutil import subprocess import time from datetime import datetime import snowflake.connector import logging # Information for Snowflake SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure" SNOWSQL_USER = "XXXXXXXX" SNOWSQL_PASSWORD = "XXXXXXX" SNOWSQL_WAREHOUSE = "DEMO_WH" SNOWSQL_SCHEMA = "PUBLIC" SNOWSQL_ROLE = "SYSADMIN" # Parameter SNOWSQL_DATABASE = "DEMO_DB" BASE_PATH = "/dev/snowflake" OBJECT_LIST_QUERY ="""\ SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script FROM ( select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE , 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'DATABASE ' || D.DATABASE_NAME || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script from INFORMATION_SCHEMA.DATABASES D WHERE DATABASE_NAME = '{}' UNION ALL select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE , 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'SCHEMA ' || S.SCHEMA_NAME || ' ' || CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script from INFORMATION_SCHEMA.SCHEMATA S UNION ALL select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE != 'VIEW' UNION ALL select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.VIEWS WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA' UNION ALL select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.SEQUENCES UNION ALL select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.FILE_FORMATS UNION ALL select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.PIPES UNION ALL select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script from INFORMATION_SCHEMA.FUNCTIONS UNION ALL select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script from "INFORMATION_SCHEMA"."PROCEDURES" ) T ORDER BY seq, catalog_name, schema_name, object_name""" DDL_DATABASE_QUERY = """\ SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'DATABASE ' || D.DATABASE_NAME || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' FROM INFORMATION_SCHEMA.DATABASES D WHERE D.DATABASE_NAME = '{}'""" DDL_SCHEMA_QUERY = """\ SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || 'SCHEMA ' || S.SCHEMA_NAME || ' ' || CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || '\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' FROM INFORMATION_SCHEMA.SCHEMATA S""" GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script" # I wasn't able to find a good way to distinguish role and share from the GRANTEE column # Therefore, I assumed that share will have postfix "_SHARE" OBJECT_PERMISSION_QUERY = """\ SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';' GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A WHERE A.OBJECT_SCHEMA = '{}' AND A.OBJECT_NAME = '{}'""" ALL_PERMISSION_QUERY = """\ SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME, REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME""" GET_DDL_WITH_PERMISSION_QUERY = """\ SELECT GET_DDL('{}','{}') script UNION ALL SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || ' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || ' ' || A.GRANTEE || ';' GRANTED FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A WHERE 'True' = '{}' AND A.OBJECT_SCHEMA = '{}' AND A.OBJECT_NAME = '{}'""" # STREAM - SHOW - GET_DDL STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}" # TASK - SHOW TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}" """ 'CREATE OR REPLACE TASK ' || schema_name || '.' || name || '/n WAREHOUSE = ' || wareshouse || CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END || CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END || -- Can't find column for the session_parameters CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';' CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END || '/nAS' || '/n' || definition || ';' """ logging.basicConfig( format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') def ScriptOut(database, base_path, includePermission=True): now = datetime.now() ctx = snowflake.connector.connect( account=SNOWSQL_ACCOUNT, warehouse=SNOWSQL_WAREHOUSE, database=database, user=SNOWSQL_USER, password=SNOWSQL_PASSWORD, schema=SNOWSQL_SCHEMA, role=SNOWSQL_ROLE ) cs = ctx.cursor() logging.info("Connected to Snowflake") try: # Change database cs.execute("USE database {}".format(database)) # Get permission at once if includePermission: logging.info("Get permission at once") permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall() perms = {} for rec in permResults: perms[rec[0]] = rec[1] # Get object list using information_schema results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall() ddlStmt = "" for rec in results: #seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script logging.info('processing {}, {}'.format(rec[3], rec[4])) if rec[3] == 'DATABASE': outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql') ddlStmt = rec[6] if includePermission: logging.info("get permission") permStmt = perms.get(rec[4]) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt elif rec[3] == 'SCHEMA': outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql') ddlStmt = rec[6] # if includePermission: # logging.info("get permission") # rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name # if rslt is not None: # ddlStmt += '\n\n' + rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[1] + '.' + rec[4]) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt else: if rec[3] in ('FUNCTION', 'PROCEDURE'): #need to provide ARGUMENT_SIGNATURE only with data_type argumentSignature = str(rec[5]) if argumentSignature != "()": args = argumentSignature.replace("(","").replace(")","").split(",") #print(args) newargs = [] for arg in args: newargs.append(arg.strip().split()[1]) argumentSignature = '(' + ', '.join(newargs) + ')' # object_type, schema_name.object_name(ARGUMENT_SIGNATURE) objectType = rec[3] objectName = rec[2] + '.' + rec[4] + argumentSignature outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql') else: # object_type, schema_name.object_name objectType = rec[3] objectName = rec[2] + '.' + rec[4] outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql') rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() ddlStmt = rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[1] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt # Write it to a file logging.info("write to a file: {}".format(outFile)) os.makedirs(os.path.dirname(outFile), exist_ok=True) logging.info("directory: {}".format(os.path.dirname(outFile))) with open(outFile, "w") as f: f.write(ddlStmt) # STREAM - SHOW - GET_DDL # created_on, name, database_name, schema_name logging.info("stream") stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall() for rec in stremResults: objectType = 'STREAM' objectName = rec[3] + '.' + rec[1] outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql') rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() ddlStmt = rslt[0] if includePermission: logging.info("get permission") permStmt = perms.get(rec[2] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt logging.info("write to a file") os.makedirs(os.path.dirname(outFile), exist_ok=True) with open(outFile, "w") as f: f.write(ddlStmt) # TASK - SHOW # created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition logging.info("taskResults") taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall() for rec in taskResults: logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name objectType = 'TASK' objectName = rec[3] + '.' + rec[1] ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6] if rec[7] is not None: #schedule ddlStmt += "\n SCHEDULE = '" + rec[7] + "'" if rec[8] is not None: #predecessor ddlStmt += "\n AFTER " + rec[8] if rec[5] is not None: #comment ddlStmt += "\n COMMENT = '" + rec[5] + "'" if rec[11] is not None: #condition ddlStmt += "\n WHEN " + rec[11] ddlStmt += '\nAS\n' + rec[10] + ';' #definition outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql') if includePermission: logging.info("get permission") permStmt = perms.get(rec[2] + '.' + objectName) if permStmt: logging.debug(permStmt) ddlStmt += '\n\n' + permStmt logging.info("write to a file") os.makedirs(os.path.dirname(outFile), exist_ok=True) with open(outFile, "w") as f: f.write(ddlStmt) except snowflake.connector.errors.ProgrammingError as e: # default error message logging.error(e) finally: cs.close() ctx.close() def main(): start_time = time.time() ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True) elapsed_time = time.time() - start_time logging.info("Completed after: {}".format(str(elapsed_time))) if __name__ == '__main__': main()