Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Created August 11, 2025 13:11
Show Gist options
  • Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/a834d3043c38ea498029e147d061624b to your computer and use it in GitHub Desktop.

Revisions

  1. MarkPryceMaherMSFT created this gist Aug 11, 2025.
    151 changes: 151 additions & 0 deletions migrate_views.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,151 @@
    """
    View Migration Script: SQL Server ➜ Fabric Warehouse/Lakehouse
    --------------------------------------------------------------
    This script copies one or more view definitions from a source SQL Server database
    into a Fabric Warehouse or Lakehouse SQL endpoint.
    Key Features:
    - Connects to SQL Server using SQLAlchemy/pyodbc.
    - Connects to Fabric using MSI authentication (access token).
    - Reads view definitions from sys.views/sys.sql_modules in the source.
    - Allows the user to specify which views to migrate.
    - Replaces 'CREATE VIEW' with 'CREATE OR ALTER VIEW' for idempotent creation.
    - Executes the migration with error handling, logging successes/failures.
    - Writes any failed migrations to a CSV for retry.
    """

    import pandas as pd
    import re
    import struct
    import sqlalchemy
    from sqlalchemy import create_engine, text
    import pyodbc
    import notebookutils
    import sempy.fabric as fabric

    # -------------------------------------------------
    # Helper: Create a SQLAlchemy engine for Fabric SQL endpoint with MSI token
    # -------------------------------------------------
    def create_engine_alt(connection_string: str):
    """
    Creates a SQLAlchemy engine for Fabric SQL endpoint authentication
    using Managed Service Identity (MSI) access token.
    Args:
    connection_string (str): ODBC connection string for Fabric SQL endpoint.
    Returns:
    sqlalchemy.Engine: Configured SQLAlchemy engine.
    """
    token = notebookutils.credentials.getToken(
    'https://analysis.windows.net/powerbi/api'
    ).encode("UTF-16-LE")

    token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
    SQL_COPT_SS_ACCESS_TOKEN = 1256

    return sqlalchemy.create_engine(
    "mssql+pyodbc://",
    creator=lambda: pyodbc.connect(
    connection_string,
    attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}
    )
    )

    # -------------------------------------------------
    # Get Fabric environment metadata (workspace/lakehouse info)
    # -------------------------------------------------
    tenant_id = spark.conf.get("trident.tenant.id")
    workspace_id = spark.conf.get("trident.workspace.id")
    lakehouse_id = spark.conf.get("trident.lakehouse.id")
    lakehouse_name = spark.conf.get("trident.lakehouse.name")

    # Fetch SQL endpoint connection string for the Fabric Lakehouse
    sql_endpoint = fabric.FabricRestClient().get(
    f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}"
    ).json()['properties']['sqlEndpointProperties']['connectionString']

    # -------------------------------------------------
    # Source SQL Server details (replace with your secure config)
    # -------------------------------------------------
    source_sql_endpoint = "<SOURCE_SERVER_NAME>" # e.g., myserver.database.windows.net
    source_database = "<SOURCE_DATABASE_NAME>"
    username = "<SOURCE_USERNAME>"
    password = "<SOURCE_PASSWORD>" # Load securely (e.g., environment var or key vault)

    # -------------------------------------------------
    # Build connection strings
    # -------------------------------------------------
    # SQL Server connection string (SQL Authentication)
    source_connection_string = (
    f"mssql+pyodbc://{username}:{password}@{source_sql_endpoint}/{source_database}"
    "?driver=ODBC+Driver+18+for+SQL+Server"
    )

    # Fabric SQL endpoint connection string (ODBC, MSI token later)
    destination_connection_string = (
    f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_endpoint},1433;"
    "Encrypt=Yes;TrustServerCertificate=No"
    )

    # -------------------------------------------------
    # Step 1: Read all view definitions from source SQL Server
    # -------------------------------------------------
    source_engine = create_engine(source_connection_string)
    df_views = pd.read_sql_query("""
    SELECT
    v.name AS ViewName,
    m.definition AS ViewDefinition
    FROM sys.views v
    JOIN sys.sql_modules m ON v.object_id = m.object_id
    ORDER BY v.name;
    """, source_engine)

    display(df_views) # Optional: inspect all available views

    # -------------------------------------------------
    # Step 2: User specifies which views to migrate
    # -------------------------------------------------
    views_to_copy = ["ViewA", "ViewB", "MyDemoView"] # Example list
    df_filtered = df_views[df_views['ViewName'].isin(views_to_copy)]

    # -------------------------------------------------
    # Step 3: Connect to Fabric SQL endpoint and migrate each view
    # -------------------------------------------------
    failed_views = [] # To track failed migrations

    engine_dest = create_engine_alt(destination_connection_string)
    with engine_dest.begin() as conn:
    for _, row in df_filtered.iterrows():
    view_name = row['ViewName']
    definition = row['ViewDefinition'].strip()

    # Replace CREATE VIEW with CREATE OR ALTER VIEW (case-insensitive)
    create_or_alter_sql = re.sub(
    r"CREATE\s+VIEW",
    "CREATE OR ALTER VIEW",
    definition,
    flags=re.IGNORECASE
    )

    try:
    conn.execute(text(create_or_alter_sql))
    print(f"✅ Migrated view: {view_name}")
    except Exception as e:
    print(f"❌ Failed to migrate view: {view_name}")
    failed_views.append({
    "ViewName": view_name,
    "Error": str(e),
    "Definition": definition
    })

    # -------------------------------------------------
    # Step 4: Save failed migrations to CSV
    # -------------------------------------------------
    if failed_views:
    failed_df = pd.DataFrame(failed_views)
    failed_df.to_csv("/lakehouse/default/Files/failed_views.csv", index=False)
    print(f"⚠️ {len(failed_views)} views failed. Details saved to failed_views.csv")
    else:
    print("✅ All views migrated successfully!")

    print("✅ View migration process complete!")