Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save sandscap-sc/3334cfbe6b287da65a5dfdac07efe6e4 to your computer and use it in GitHub Desktop.

Select an option

Save sandscap-sc/3334cfbe6b287da65a5dfdac07efe6e4 to your computer and use it in GitHub Desktop.
Generate DDL including permission of objects in a given Snowflake database into separate files
import os
import shutil
import subprocess
import time
from datetime import datetime
import snowflake.connector
import logging
# Information for Snowflake
SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure"
SNOWSQL_USER = "XXXXXXXX"
SNOWSQL_PASSWORD = "XXXXXXX"
SNOWSQL_WAREHOUSE = "DEMO_WH"
SNOWSQL_SCHEMA = "PUBLIC"
SNOWSQL_ROLE = "SYSADMIN"
# Parameter
SNOWSQL_DATABASE = "DEMO_DB"
BASE_PATH = "/dev/snowflake"
OBJECT_LIST_QUERY ="""\
SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
FROM (
select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE
, 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'DATABASE ' || D.DATABASE_NAME || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script
from INFORMATION_SCHEMA.DATABASES D
WHERE DATABASE_NAME = '{}'
UNION ALL
select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE
, 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script
from INFORMATION_SCHEMA.SCHEMATA S
UNION ALL
select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE != 'VIEW'
UNION ALL
select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA'
UNION ALL
select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.SEQUENCES
UNION ALL
select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.FILE_FORMATS
UNION ALL
select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.PIPES
UNION ALL
select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.FUNCTIONS
UNION ALL
select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script
from "INFORMATION_SCHEMA"."PROCEDURES"
) T
ORDER BY seq, catalog_name, schema_name, object_name"""
DDL_DATABASE_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'DATABASE ' || D.DATABASE_NAME || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.DATABASES D
WHERE D.DATABASE_NAME = '{}'"""
DDL_SCHEMA_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.SCHEMATA S"""
GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script"
# I wasn't able to find a good way to distinguish role and share from the GRANTEE column
# Therefore, I assumed that share will have postfix "_SHARE"
OBJECT_PERMISSION_QUERY = """\
SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';' GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
WHERE A.OBJECT_SCHEMA = '{}'
AND A.OBJECT_NAME = '{}'"""
ALL_PERMISSION_QUERY = """\
SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME,
REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME"""
GET_DDL_WITH_PERMISSION_QUERY = """\
SELECT GET_DDL('{}','{}') script
UNION ALL
SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';' GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
WHERE 'True' = '{}'
AND A.OBJECT_SCHEMA = '{}'
AND A.OBJECT_NAME = '{}'"""
# STREAM - SHOW - GET_DDL
STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}"
# TASK - SHOW
TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}"
""" 'CREATE OR REPLACE TASK ' || schema_name || '.' || name ||
'/n WAREHOUSE = ' || wareshouse ||
CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END ||
CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END ||
-- Can't find column for the session_parameters
CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';'
CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END ||
'/nAS' ||
'/n' || definition || ';'
"""
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
def ScriptOut(database, base_path, includePermission=True):
now = datetime.now()
ctx = snowflake.connector.connect(
account=SNOWSQL_ACCOUNT,
warehouse=SNOWSQL_WAREHOUSE,
database=database,
user=SNOWSQL_USER,
password=SNOWSQL_PASSWORD,
schema=SNOWSQL_SCHEMA,
role=SNOWSQL_ROLE
)
cs = ctx.cursor()
logging.info("Connected to Snowflake")
try:
# Change database
cs.execute("USE database {}".format(database))
# Get permission at once
if includePermission:
logging.info("Get permission at once")
permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall()
perms = {}
for rec in permResults:
perms[rec[0]] = rec[1]
# Get object list using information_schema
results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall()
ddlStmt = ""
for rec in results:
#seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
logging.info('processing {}, {}'.format(rec[3], rec[4]))
if rec[3] == 'DATABASE':
outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql')
ddlStmt = rec[6]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[4])
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
elif rec[3] == 'SCHEMA':
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql')
ddlStmt = rec[6]
# if includePermission:
# logging.info("get permission")
# rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name
# if rslt is not None:
# ddlStmt += '\n\n' + rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[1] + '.' + rec[4])
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
else:
if rec[3] in ('FUNCTION', 'PROCEDURE'):
#need to provide ARGUMENT_SIGNATURE only with data_type
argumentSignature = str(rec[5])
if argumentSignature != "()":
args = argumentSignature.replace("(","").replace(")","").split(",")
#print(args)
newargs = []
for arg in args:
newargs.append(arg.strip().split()[1])
argumentSignature = '(' + ', '.join(newargs) + ')'
# object_type, schema_name.object_name(ARGUMENT_SIGNATURE)
objectType = rec[3]
objectName = rec[2] + '.' + rec[4] + argumentSignature
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql')
else:
# object_type, schema_name.object_name
objectType = rec[3]
objectName = rec[2] + '.' + rec[4]
outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql')
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
ddlStmt = rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[1] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
# Write it to a file
logging.info("write to a file: {}".format(outFile))
os.makedirs(os.path.dirname(outFile), exist_ok=True)
logging.info("directory: {}".format(os.path.dirname(outFile)))
with open(outFile, "w") as f:
f.write(ddlStmt)
# STREAM - SHOW - GET_DDL
# created_on, name, database_name, schema_name
logging.info("stream")
stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall()
for rec in stremResults:
objectType = 'STREAM'
objectName = rec[3] + '.' + rec[1]
outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql')
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
ddlStmt = rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[2] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
logging.info("write to a file")
os.makedirs(os.path.dirname(outFile), exist_ok=True)
with open(outFile, "w") as f:
f.write(ddlStmt)
# TASK - SHOW
# created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition
logging.info("taskResults")
taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall()
for rec in taskResults:
logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name
objectType = 'TASK'
objectName = rec[3] + '.' + rec[1]
ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6]
if rec[7] is not None: #schedule
ddlStmt += "\n SCHEDULE = '" + rec[7] + "'"
if rec[8] is not None: #predecessor
ddlStmt += "\n AFTER " + rec[8]
if rec[5] is not None: #comment
ddlStmt += "\n COMMENT = '" + rec[5] + "'"
if rec[11] is not None: #condition
ddlStmt += "\n WHEN " + rec[11]
ddlStmt += '\nAS\n' + rec[10] + ';' #definition
outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql')
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[2] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
logging.info("write to a file")
os.makedirs(os.path.dirname(outFile), exist_ok=True)
with open(outFile, "w") as f:
f.write(ddlStmt)
except snowflake.connector.errors.ProgrammingError as e:
# default error message
logging.error(e)
finally:
cs.close()
ctx.close()
def main():
start_time = time.time()
ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True)
elapsed_time = time.time() - start_time
logging.info("Completed after: {}".format(str(elapsed_time)))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment