Skip to content

Instantly share code, notes, and snippets.

@MarkPryceMaherMSFT
Last active October 4, 2025 03:12
Show Gist options
  • Select an option

  • Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.

Select an option

Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.

Revisions

  1. MarkPryceMaherMSFT revised this gist Oct 3, 2025. 1 changed file with 4 additions and 2 deletions.
    6 changes: 4 additions & 2 deletions ConvertViews.py
    Original file line number Diff line number Diff line change
    @@ -145,8 +145,9 @@ def transform_sql_for_sqlserver(sql_text: str) -> str:

    if view_text:
    # Step 3: Generate CREATE VIEW statement
    print(view_text)
    view_text = transform_sql_for_sqlserver(view_text)
    create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}"
    print(create_stmt)
    tsql_stmt = transform_sql_for_sqlserver(create_stmt)
    print(tsql_stmt)
    # Step 4: Execute on SQL Server
    @@ -160,4 +161,5 @@ def transform_sql_for_sqlserver(sql_text: str) -> str:
    print(f"-- No view text found for {view_name}")


    connection. Commit()
    connection.commit()

  2. MarkPryceMaherMSFT created this gist Oct 3, 2025.
    163 changes: 163 additions & 0 deletions ConvertViews.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,163 @@
    %%pyspark

    import sempy.fabric as fabric
    import struct
    import sqlalchemy
    import pyodbc
    import pandas as pd
    from notebookutils import mssparkutils

    #Function to Return sqlalchemt ODBC Engine, given a connection string and using Integrated AAD Auth to Fabric
    def create_engine(connection_string : str):
    token = mssparkutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
    SQL_COPT_SS_ACCESS_TOKEN = 1256
    return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}))

    import re

    def transform_sql_for_sqlserver(sql_text: str) -> str:
    """
    Translate common Spark SQL syntax/features into T-SQL equivalents
    so Spark views can be recreated in SQL Server.
    """

    transformed = sql_text

    # --------------------------------------------
    # IDENTIFIERS
    # --------------------------------------------
    # Spark allows backticks for identifiers: `col`
    transformed = transformed.replace("`", "[").replace("]", "]")

    # --------------------------------------------
    # LIMIT → TOP
    # --------------------------------------------
    limit_match = re.search(r"LIMIT\s+(\d+)", transformed, re.IGNORECASE)
    if limit_match:
    limit_val = limit_match.group(1)
    transformed = re.sub(r"(?i)^SELECT", f"SELECT TOP ({limit_val})", transformed, count=1)
    transformed = re.sub(r"(?i)\s+LIMIT\s+\d+", "", transformed)

    # --------------------------------------------
    # FUNCTIONS
    # --------------------------------------------
    replacements = {
    r"(?i)\bIFNULL\s*\(": "ISNULL(",
    r"(?i)\bNVL\s*\(": "ISNULL(",
    r"(?i)\bSIZE\s*\(": "LEN(",
    r"(?i)\bLCASE\s*\(": "LOWER(",
    r"(?i)\bUCASE\s*\(": "UPPER(",
    r"(?i)\bRAND\s*\(": "RAND(", # same name, but may behave differently
    r"(?i)\bRLIKE\b": "LIKE", # Spark regex match → fallback to LIKE
    r"(?i)\bSTRING\s*TYPE\b": "NVARCHAR(MAX)",
    }
    for pattern, repl in replacements.items():
    transformed = re.sub(pattern, repl, transformed)

    # --------------------------------------------
    # CAST differences
    # --------------------------------------------
    # Spark: CAST(col AS STRING) → SQL Server: CAST(col AS NVARCHAR(MAX))
    transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+STRING\)", r"CAST(\1 AS NVARCHAR(MAX))", transformed)

    # Spark: CAST(col AS DOUBLE) → SQL Server: FLOAT
    transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+DOUBLE\)", r"CAST(\1 AS FLOAT)", transformed)

    # Spark: CAST(col AS FLOAT) → SQL Server: FLOAT
    transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+FLOAT\)", r"CAST(\1 AS FLOAT)", transformed)

    # Spark: CAST(col AS BOOLEAN) → SQL Server: BIT
    transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+BOOLEAN\)", r"CAST(\1 AS BIT)", transformed)

    # --------------------------------------------
    # DATE/TIME FUNCTIONS
    # --------------------------------------------
    replacements_date = {
    r"(?i)\bCURRENT_DATE\b": "CAST(GETDATE() AS DATE)",
    r"(?i)\bCURRENT_TIMESTAMP\b": "GETDATE()",
    r"(?i)\bNOW\(\)": "GETDATE()",
    r"(?i)\bDATE_ADD\s*\(": "DATEADD(DAY,", # DATE_ADD(startDate, days)
    r"(?i)\bDATE_SUB\s*\(": "DATEADD(DAY,-", # DATE_SUB(startDate, days)
    r"(?i)\bUNIX_TIMESTAMP\s*\(\)": "DATEDIFF(SECOND, '1970-01-01', GETUTCDATE())",
    r"(?i)\bFROM_UNIXTIME\s*\(": "DATEADD(SECOND,", # FROM_UNIXTIME(epoch)
    }
    for pattern, repl in replacements_date.items():
    transformed = re.sub(pattern, repl, transformed)

    # --------------------------------------------
    # STRING FUNCTIONS
    # --------------------------------------------
    string_funcs = {
    r"(?i)\bSUBSTR\s*\(": "SUBSTRING(",
    r"(?i)\bINSTR\s*\(": "CHARINDEX(",
    r"(?i)\bCONCAT_WS\s*\(": "CONCAT(", # T-SQL doesn't have CONCAT_WS before 2017
    }
    for pattern, repl in string_funcs.items():
    transformed = re.sub(pattern, repl, transformed)

    # --------------------------------------------
    # REMOVE unsupported Spark clauses
    # --------------------------------------------
    # Spark uses DISTRIBUTE BY, CLUSTER BY, etc.
    transformed = re.sub(r"(?i)\bDISTRIBUTE\s+BY\b.*", "", transformed)
    transformed = re.sub(r"(?i)\bCLUSTER\s+BY\b.*", "", transformed)
    transformed = re.sub(r"(?i)\bSORT\s+BY\b.*", "", transformed)

    return transformed.strip()

    # Get ODBC Connection String for Default LH ijn this Notebook
    tenant_id=spark.conf.get("trident.tenant.id")
    workspace_id=spark.conf.get("trident.workspace.id")
    lakehouse_id=spark.conf.get("trident.lakehouse.id")
    lakehouse_name=spark.conf.get("trident.lakehouse.name")

    dw_name=lakehouse_name

    sql_end_point= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString']
    connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_end_point}"
    print (f"connection_string={connection_string}")

    engine = create_engine(connection_string)
    with engine.connect() as alchemy_connection:
    #Execute a TSQL Stored Procedure or DDL/DML on a Fabric DW
    connection = engine.raw_connection()
    cursor = connection.cursor()

    # Step 1: List views
    views_df = spark.sql(f"SHOW VIEWS")
    views = views_df.collect()

    for row in views:
    view_name = row['viewName']
    print(f"\n-- Processing view: {view_name}")

    # Step 2: Extract metadata with DESC EXTENDED
    desc_df = spark.sql(f"DESC EXTENDED {view_name}")
    desc = desc_df.collect()

    # Look for 'View Text' in the extended description
    view_text = None
    for d in desc:
    if d.col_name.strip().lower() == "view text":
    view_text = d.data_type # contains the SQL query text
    break

    if view_text:
    # Step 3: Generate CREATE VIEW statement
    create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}"
    print(create_stmt)
    tsql_stmt = transform_sql_for_sqlserver(create_stmt)
    print(tsql_stmt)
    # Step 4: Execute on SQL Server
    try:
    cursor.execute(tsql_stmt)
    connection.commit()
    print(f"-- Successfully created {view_name} on SQL Server")
    except Exception as e:
    print(f"-- Failed to create {view_name}: {e}")
    else:
    print(f"-- No view text found for {view_name}")


    connection. Commit()