Last active
October 4, 2025 03:12
-
-
Save MarkPryceMaherMSFT/db9f98e051c7633c0b1ac90d5a66f397 to your computer and use it in GitHub Desktop.
Revisions
-
MarkPryceMaherMSFT revised this gist
Oct 3, 2025 . 1 changed file with 4 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -145,8 +145,9 @@ def transform_sql_for_sqlserver(sql_text: str) -> str: if view_text: # Step 3: Generate CREATE VIEW statement print(view_text) view_text = transform_sql_for_sqlserver(view_text) create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}" tsql_stmt = transform_sql_for_sqlserver(create_stmt) print(tsql_stmt) # Step 4: Execute on SQL Server @@ -160,4 +161,5 @@ def transform_sql_for_sqlserver(sql_text: str) -> str: print(f"-- No view text found for {view_name}") connection.commit() -
MarkPryceMaherMSFT created this gist
Oct 3, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,163 @@ %%pyspark import sempy.fabric as fabric import struct import sqlalchemy import pyodbc import pandas as pd from notebookutils import mssparkutils #Function to Return sqlalchemt ODBC Engine, given a connection string and using Integrated AAD Auth to Fabric def create_engine(connection_string : str): token = mssparkutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE") token_struct = struct.pack(f'<I{len(token)}s', len(token), token) SQL_COPT_SS_ACCESS_TOKEN = 1256 return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct})) import re def transform_sql_for_sqlserver(sql_text: str) -> str: """ Translate common Spark SQL syntax/features into T-SQL equivalents so Spark views can be recreated in SQL Server. """ transformed = sql_text # -------------------------------------------- # IDENTIFIERS # -------------------------------------------- # Spark allows backticks for identifiers: `col` transformed = transformed.replace("`", "[").replace("]", "]") # -------------------------------------------- # LIMIT → TOP # -------------------------------------------- limit_match = re.search(r"LIMIT\s+(\d+)", transformed, re.IGNORECASE) if limit_match: limit_val = limit_match.group(1) transformed = re.sub(r"(?i)^SELECT", f"SELECT TOP ({limit_val})", transformed, count=1) transformed = re.sub(r"(?i)\s+LIMIT\s+\d+", "", transformed) # -------------------------------------------- # FUNCTIONS # -------------------------------------------- replacements = { r"(?i)\bIFNULL\s*\(": "ISNULL(", r"(?i)\bNVL\s*\(": "ISNULL(", r"(?i)\bSIZE\s*\(": "LEN(", r"(?i)\bLCASE\s*\(": "LOWER(", r"(?i)\bUCASE\s*\(": "UPPER(", r"(?i)\bRAND\s*\(": "RAND(", # same name, but may behave differently r"(?i)\bRLIKE\b": "LIKE", # Spark regex match → fallback to LIKE r"(?i)\bSTRING\s*TYPE\b": "NVARCHAR(MAX)", } for pattern, repl in replacements.items(): transformed = re.sub(pattern, repl, transformed) # -------------------------------------------- # CAST differences # -------------------------------------------- # Spark: CAST(col AS STRING) → SQL Server: CAST(col AS NVARCHAR(MAX)) transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+STRING\)", r"CAST(\1 AS NVARCHAR(MAX))", transformed) # Spark: CAST(col AS DOUBLE) → SQL Server: FLOAT transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+DOUBLE\)", r"CAST(\1 AS FLOAT)", transformed) # Spark: CAST(col AS FLOAT) → SQL Server: FLOAT transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+FLOAT\)", r"CAST(\1 AS FLOAT)", transformed) # Spark: CAST(col AS BOOLEAN) → SQL Server: BIT transformed = re.sub(r"(?i)CAST\s*\((.*?)\s+AS\s+BOOLEAN\)", r"CAST(\1 AS BIT)", transformed) # -------------------------------------------- # DATE/TIME FUNCTIONS # -------------------------------------------- replacements_date = { r"(?i)\bCURRENT_DATE\b": "CAST(GETDATE() AS DATE)", r"(?i)\bCURRENT_TIMESTAMP\b": "GETDATE()", r"(?i)\bNOW\(\)": "GETDATE()", r"(?i)\bDATE_ADD\s*\(": "DATEADD(DAY,", # DATE_ADD(startDate, days) r"(?i)\bDATE_SUB\s*\(": "DATEADD(DAY,-", # DATE_SUB(startDate, days) r"(?i)\bUNIX_TIMESTAMP\s*\(\)": "DATEDIFF(SECOND, '1970-01-01', GETUTCDATE())", r"(?i)\bFROM_UNIXTIME\s*\(": "DATEADD(SECOND,", # FROM_UNIXTIME(epoch) } for pattern, repl in replacements_date.items(): transformed = re.sub(pattern, repl, transformed) # -------------------------------------------- # STRING FUNCTIONS # -------------------------------------------- string_funcs = { r"(?i)\bSUBSTR\s*\(": "SUBSTRING(", r"(?i)\bINSTR\s*\(": "CHARINDEX(", r"(?i)\bCONCAT_WS\s*\(": "CONCAT(", # T-SQL doesn't have CONCAT_WS before 2017 } for pattern, repl in string_funcs.items(): transformed = re.sub(pattern, repl, transformed) # -------------------------------------------- # REMOVE unsupported Spark clauses # -------------------------------------------- # Spark uses DISTRIBUTE BY, CLUSTER BY, etc. transformed = re.sub(r"(?i)\bDISTRIBUTE\s+BY\b.*", "", transformed) transformed = re.sub(r"(?i)\bCLUSTER\s+BY\b.*", "", transformed) transformed = re.sub(r"(?i)\bSORT\s+BY\b.*", "", transformed) return transformed.strip() # Get ODBC Connection String for Default LH ijn this Notebook tenant_id=spark.conf.get("trident.tenant.id") workspace_id=spark.conf.get("trident.workspace.id") lakehouse_id=spark.conf.get("trident.lakehouse.id") lakehouse_name=spark.conf.get("trident.lakehouse.name") dw_name=lakehouse_name sql_end_point= fabric.FabricRestClient().get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['connectionString'] connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={sql_end_point}" print (f"connection_string={connection_string}") engine = create_engine(connection_string) with engine.connect() as alchemy_connection: #Execute a TSQL Stored Procedure or DDL/DML on a Fabric DW connection = engine.raw_connection() cursor = connection.cursor() # Step 1: List views views_df = spark.sql(f"SHOW VIEWS") views = views_df.collect() for row in views: view_name = row['viewName'] print(f"\n-- Processing view: {view_name}") # Step 2: Extract metadata with DESC EXTENDED desc_df = spark.sql(f"DESC EXTENDED {view_name}") desc = desc_df.collect() # Look for 'View Text' in the extended description view_text = None for d in desc: if d.col_name.strip().lower() == "view text": view_text = d.data_type # contains the SQL query text break if view_text: # Step 3: Generate CREATE VIEW statement create_stmt = f"CREATE OR ALTER VIEW {view_name} AS {view_text}" print(create_stmt) tsql_stmt = transform_sql_for_sqlserver(create_stmt) print(tsql_stmt) # Step 4: Execute on SQL Server try: cursor.execute(tsql_stmt) connection.commit() print(f"-- Successfully created {view_name} on SQL Server") except Exception as e: print(f"-- Failed to create {view_name}: {e}") else: print(f"-- No view text found for {view_name}") connection. Commit()