Created
August 11, 2025 13:11
-
-
Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.
Revisions
-
MarkPryceMaherMSFT created this gist
Aug 11, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,151 @@ """ View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse -------------------------------------------------------------- This script copies one or more view definitions from a source SQL Server database into a Fabric Warehouse or Lakehouse SQL endpoint. Key Features: - Connects to SQL Server using SQLAlchemy/pyodbc. - Connects to Fabric using MSI authentication (access token). - Reads view definitions from sys.views/sys.sql_modules in the source. - Allows the user to specify which views to migrate. - Replaces 'CREATE VIEW' with 'CREATE OR ALTER VIEW' for idempotent creation. - Executes the migration with error handling, logging successes/failures. - Writes any failed migrations to a CSV for retry. """ import pandas as pd import re import struct import sqlalchemy from sqlalchemy import create_engine, text import pyodbc import notebookutils import sempy.fabric as fabric # ------------------------------------------------- # Helper: Create a SQLAlchemy engine for Fabric SQL endpoint with MSI token # ------------------------------------------------- def create_engine_alt(connection_string: str): """ Creates a SQLAlchemy engine for Fabric SQL endpoint authentication using Managed Service Identity (MSI) access token. Args: connection_string (str): ODBC connection string for Fabric SQL endpoint. Returns: sqlalchemy.Engine: Configured SQLAlchemy engine. """ token = notebookutils.credentials.getToken( 'https://analysis.windows.net/powerbi/api' ).encode("UTF-16-LE") token_struct = struct.pack(f'<I{len(token)}s', len(token), token) SQL_COPT_SS_ACCESS_TOKEN = 1256 return sqlalchemy.create_engine( "mssql+pyodbc://", creator=lambda: pyodbc.connect( connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct} ) ) # ------------------------------------------------- # Get Fabric environment metadata (workspace/lakehouse info) # ------------------------------------------------- tenant_id = spark.conf.get("trident.tenant.id") workspace_id = spark.conf.get("trident.workspace.id") lakehouse_id = spark.conf.get("trident.lakehouse.id") lakehouse_name = spark.conf.get("trident.lakehouse.name") # Fetch SQL endpoint connection string for the Fabric Lakehouse sql_endpoint = fabric.FabricRestClient().get( f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}" ).json()['properties']['sqlEndpointProperties']['connectionString'] # ------------------------------------------------- # Source SQL Server details (replace with your secure config) # ------------------------------------------------- source_sql_endpoint = "<SOURCE_SERVER_NAME>" # e.g., myserver.database.windows.net source_database = "<SOURCE_DATABASE_NAME>" username = "<SOURCE_USERNAME>" password = "<SOURCE_PASSWORD>" # Load securely (e.g., environment var or key vault) # ------------------------------------------------- # Build connection strings # ------------------------------------------------- # SQL Server connection string (SQL Authentication) source_connection_string = ( f"mssql+pyodbc://{username}:{password}@{source_sql_endpoint}/{source_database}" "?driver=ODBC+Driver+18+for+SQL+Server" ) # Fabric SQL endpoint connection string (ODBC, MSI token later) destination_connection_string = ( f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;" "Encrypt=Yes;TrustServerCertificate=No" ) # ------------------------------------------------- # Step 1: Read all view definitions from source SQL Server # ------------------------------------------------- source_engine = create_engine(source_connection_string) df_views = pd.read_sql_query(""" SELECT v.name AS ViewName, m.definition AS ViewDefinition FROM sys.views v JOIN sys.sql_modules m ON v.object_id = m.object_id ORDER BY v.name; """, source_engine) display(df_views) # Optional: inspect all available views # ------------------------------------------------- # Step 2: User specifies which views to migrate # ------------------------------------------------- views_to_copy = ["ViewA", "ViewB", "MyDemoView"] # Example list df_filtered = df_views[df_views['ViewName'].isin(views_to_copy)] # ------------------------------------------------- # Step 3: Connect to Fabric SQL endpoint and migrate each view # ------------------------------------------------- failed_views = [] # To track failed migrations engine_dest = create_engine_alt(destination_connection_string) with engine_dest.begin() as conn: for _, row in df_filtered.iterrows(): view_name = row['ViewName'] definition = row['ViewDefinition'].strip() # Replace CREATE VIEW with CREATE OR ALTER VIEW (case-insensitive) create_or_alter_sql = re.sub( r"CREATE\s+VIEW", "CREATE OR ALTER VIEW", definition, flags=re.IGNORECASE ) try: conn.execute(text(create_or_alter_sql)) print(f"✅ Migrated view: {view_name}") except Exception as e: print(f"❌ Failed to migrate view: {view_name}") failed_views.append({ "ViewName": view_name, "Error": str(e), "Definition": definition }) # ------------------------------------------------- # Step 4: Save failed migrations to CSV # ------------------------------------------------- if failed_views: failed_df = pd.DataFrame(failed_views) failed_df.to_csv("/lakehouse/default/Files/failed_views.csv", index=False) print(f"⚠️ {len(failed_views)} views failed. Details saved to failed_views.csv") else: print("✅ All views migrated successfully!") print("✅ View migration process complete!")