Skip to content

Instantly share code, notes, and snippets.

@aculich
Created December 27, 2024 21:26
Show Gist options
  • Select an option

  • Save aculich/2f18504b2678f3dcca0d5b6bb60ad9e4 to your computer and use it in GitHub Desktop.

Select an option

Save aculich/2f18504b2678f3dcca0d5b6bb60ad9e4 to your computer and use it in GitHub Desktop.

Revisions

  1. aculich created this gist Dec 27, 2024.
    83 changes: 83 additions & 0 deletions dchb_excel_to_sqlite.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    #!/usr/bin/env python

    import os
    import sqlite3
    from tqdm import tqdm

    def get_table_schema(cursor, table_name):
    cursor.execute(f"PRAGMA table_info({table_name})")
    return {col[1]: col[2] for col in cursor.fetchall()}

    def align_schemas(source_conn, dest_conn, table_name):
    source_cursor = source_conn.cursor()
    dest_cursor = dest_conn.cursor()

    # Retrieve source and destination schemas
    source_schema = get_table_schema(source_cursor, table_name)
    dest_schema = get_table_schema(dest_cursor, table_name)

    # Add missing columns to the destination table
    for column, col_type in source_schema.items():
    if column not in dest_schema:
    alter_table_sql = f'ALTER TABLE "{table_name}" ADD COLUMN "{column}" {col_type}'
    dest_cursor.execute(alter_table_sql)
    dest_conn.commit()

    def merge_tables(source_conn, dest_conn, table_name):
    source_cursor = source_conn.cursor()
    dest_cursor = dest_conn.cursor()

    # Align schemas before merging
    align_schemas(source_conn, dest_conn, table_name)

    # Retrieve column names after alignment
    dest_schema = get_table_schema(dest_cursor, table_name)
    columns = ', '.join([f'"{col}"' for col in dest_schema.keys()]) # Escape column names
    placeholders = ', '.join('?' * len(dest_schema))

    # Insert data from source to destination
    source_cursor.execute(f'SELECT {columns} FROM "{table_name}"') # Escape table name
    rows = source_cursor.fetchall()
    if rows:
    insert_sql = f'INSERT INTO "{table_name}" ({columns}) VALUES ({placeholders})' # Escape table name
    dest_cursor.executemany(insert_sql, rows)

    dest_conn.commit()

    def merge_databases(source_db, dest_conn):
    source_conn = sqlite3.connect(source_db)
    source_cursor = source_conn.cursor()

    # Retrieve all table names from the source database
    source_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = source_cursor.fetchall()

    for (table_name,) in tables:
    merge_tables(source_conn, dest_conn, table_name)

    source_conn.close()

    def main(rawdata_dir, output_db):
    # Connect to the destination database (it will be created if it doesn't exist)
    dest_conn = sqlite3.connect(output_db)

    # Traverse the rawdata directory and its subdirectories to find .db files
    db_files = []
    for root, _, files in os.walk(rawdata_dir):
    for file in files:
    if file.endswith('.db'):
    db_files.append(os.path.join(root, file))

    # Merge each database into the destination database with a progress bar
    for db_file in tqdm(db_files, desc="Merging databases"):
    merge_databases(db_file, dest_conn)

    dest_conn.close()
    print(f"All databases have been merged into {output_db}")

    if __name__ == "__main__":
    import sys
    if len(sys.argv) != 3:
    print("Usage: merge_databases.py <rawdata_dir> <output_db>")
    else:
    main(sys.argv[1], sys.argv[2])