-
-
Save gordthompson/be1799bd68a12be58c880bb9c92158bc to your computer and use it in GitHub Desktop.
| # Copyright 2024 Gordon D. Thompson, [email protected] | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # version 1.7 - 2024-06-21 | |
| import uuid | |
| import pandas as pd | |
| import sqlalchemy as sa | |
| def df_upsert(data_frame, table_name, engine, schema=None, match_columns=None, | |
| chunksize=None, dtype=None, skip_inserts=False, skip_updates=False): | |
| """ | |
| Perform an "upsert" on a SQL Server table from a DataFrame. | |
| Constructs a T-SQL MERGE statement, uploads the DataFrame to a | |
| temporary table, and then executes the MERGE. | |
| Parameters | |
| ---------- | |
| data_frame : pandas.DataFrame | |
| The DataFrame to be upserted. | |
| table_name : str | |
| The name of the target table. | |
| engine : sqlalchemy.engine.Engine | |
| The SQLAlchemy Engine to use. | |
| schema : str, optional | |
| The name of the schema containing the target table. | |
| match_columns : list of str, optional | |
| A list of the column name(s) on which to match. If omitted, the | |
| primary key columns of the target table will be used. | |
| chunksize: int, optional | |
| Specify chunk size for .to_sql(). See the pandas docs for details. | |
| dtype : dict, optional | |
| Specify column types for .to_sql(). See the pandas docs for details. | |
| skip_inserts : bool, optional | |
| Skip inserting unmatched rows. (Default: False) | |
| skip_updates : bool, optional | |
| Skip updating matched rows. (Default: False) | |
| """ | |
| if skip_inserts and skip_updates: | |
| raise ValueError("skip_inserts and skip_updates cannot both be True") | |
| temp_table_name = "##" + str(uuid.uuid4()).replace("-", "_") | |
| table_spec = "" | |
| if schema: | |
| table_spec += "[" + schema.replace("]", "]]") + "]." | |
| table_spec += "[" + table_name.replace("]", "]]") + "]" | |
| df_columns = list(data_frame.columns) | |
| if not match_columns: | |
| insp = sa.inspect(engine) | |
| match_columns = insp.get_pk_constraint(table_name, schema=schema)[ | |
| "constrained_columns" | |
| ] | |
| columns_to_update = [col for col in df_columns if col not in match_columns] | |
| stmt = f"MERGE {table_spec} WITH (HOLDLOCK) AS main\n" | |
| stmt += f"USING (SELECT {', '.join([f'[{col}]' for col in df_columns])} FROM {temp_table_name}) AS temp\n" | |
| join_condition = " AND ".join( | |
| [f"main.[{col}] = temp.[{col}]" for col in match_columns] | |
| ) | |
| stmt += f"ON ({join_condition})" | |
| if not skip_updates: | |
| stmt += "\nWHEN MATCHED THEN\n" | |
| update_list = ", ".join( | |
| [f"[{col}] = temp.[{col}]" for col in columns_to_update] | |
| ) | |
| stmt += f" UPDATE SET {update_list}" | |
| if not skip_inserts: | |
| stmt += "\nWHEN NOT MATCHED THEN\n" | |
| insert_cols_str = ", ".join([f"[{col}]" for col in df_columns]) | |
| insert_vals_str = ", ".join([f"temp.[{col}]" for col in df_columns]) | |
| stmt += f" INSERT ({insert_cols_str}) VALUES ({insert_vals_str})" | |
| stmt += ";" | |
| with engine.begin() as conn: | |
| data_frame.to_sql(temp_table_name, conn, index=False, chunksize=chunksize, dtype=dtype) | |
| conn.exec_driver_sql(stmt) | |
| conn.exec_driver_sql(f"DROP TABLE IF EXISTS {temp_table_name}") | |
| if __name__ == "__main__": | |
| # Usage example adapted from | |
| # https://stackoverflow.com/a/62388768/2144390 | |
| engine = sa.create_engine( | |
| "mssql+pyodbc://scott:tiger^[email protected]/test" | |
| "?driver=ODBC+Driver+17+for+SQL+Server", | |
| fast_executemany=True, | |
| ) | |
| # create example environment | |
| with engine.begin() as conn: | |
| conn.exec_driver_sql("DROP TABLE IF EXISTS main_table") | |
| conn.exec_driver_sql( | |
| "CREATE TABLE main_table (id int primary key, txt nvarchar(50), status nvarchar(50))" | |
| ) | |
| conn.exec_driver_sql( | |
| "INSERT INTO main_table (id, txt, status) VALUES (1, N'row 1 old text', N'original')" | |
| ) | |
| # [(1, 'row 1 old text', 'original')] | |
| # DataFrame to upsert | |
| df = pd.DataFrame( | |
| [(2, "new row 2 text", "upserted"), (1, "row 1 new text", "upserted")], | |
| columns=["id", "txt", "status"], | |
| ) | |
| df_upsert(df, "main_table", engine) | |
| """The MERGE statement generated for this example: | |
| MERGE [main_table] WITH (HOLDLOCK) AS main | |
| USING (SELECT [id], [txt], [status] FROM ##955db388_01c5_4e79_a5d1_3e8cfadf400b) AS temp | |
| ON (main.[id] = temp.[id]) | |
| WHEN MATCHED THEN | |
| UPDATE SET [txt] = temp.[txt], [status] = temp.[status] | |
| WHEN NOT MATCHED THEN | |
| INSERT ([id], [txt], [status]) VALUES (temp.[id], temp.[txt], temp.[status]); | |
| """ | |
| # check results | |
| with engine.begin() as conn: | |
| print( | |
| conn.exec_driver_sql("SELECT * FROM main_table").all() | |
| ) | |
| # [(1, 'row 1 new text', 'upserted'), (2, 'new row 2 text', 'upserted')] |
@pseudobacon - Revert the above change (with block) to use .to_sql() again, then change the name of the temporary table from #temp_table to ##temp_table (in 4 places).
… or use the updated code I just posted.
Huzzah it works! Thank you very much
I've also got a scenario where I am presumably running out of memory when trying to merge in 500k+ records:
For this particular data frame there are some columns which are completely NULL. From SQL's point of view its not necessary to supply NULL as a value to merge in, would it be possible to add a parameter to exclude columns where the entire column is NULL? This would reduce the number of columns needed for comparison in the MERGE and improve performance
@pseudobacon - I added chunksize to try and help with memory consumption.

Yeah I made a change to the dataframe definition:

And my sourceconnection is defined as a create_engine("mssql+pyodbc://sqldriver")
My upsert is a single line