import pandas as pd import numpy as np import fastparquet from sqlalchemy import create_engine, schema, Table # Copied from pandas with modifications def __get_dtype(column, sqltype): import sqlalchemy.dialects as sqld from sqlalchemy.types import (Integer, Float, Boolean, DateTime, Date, TIMESTAMP) if isinstance(sqltype, Float): return float elif isinstance(sqltype, Integer): # Since DataFrame cannot handle nullable int, convert nullable ints to floats if column.nullable: return float # TODO: Refine integer size. return np.dtype('int64') elif isinstance(sqltype, TIMESTAMP): # we have a timezone capable type if not sqltype.timezone: return np.dtype('datetime64[ns]') return DatetimeTZDtype elif isinstance(sqltype, DateTime): # Caution: np.datetime64 is also a subclass of np.number. return np.dtype('datetime64[ns]') elif isinstance(sqltype, Date): return np.date elif isinstance(sqltype, Boolean): return bool elif isinstance(sqltype, sqld.mssql.base.BIT): # Handling database provider specific types return np.dtype('u1') # Catch all type - handle provider specific types in another elif block return object def __write_parquet(output_path: str, batch_array, column_dict, write_index: bool, compression: str, append: bool): # Create the DataFrame to hold the batch array contents b_df = pd.DataFrame(batch_array, columns=column_dict) # Cast the DataFrame columns to the sqlalchemy column analogues b_df = b_df.astype(dtype=column_dict) # Write to the parquet file (first write needs append=False) fastparquet.write(output_path, b_df, write_index=write_index, compression=compression, append=append) def table_to_parquet(output_path: str, table_name: str, con, batch_size: int = 10000, write_index: bool = True, compression: str = None): # Get database schema using sqlalchemy reflection db_engine = create_engine(con) db_metadata = schema.MetaData(bind=db_engine) db_table = Table(table_name, db_metadata, autoload=True) # Get the columns for the parquet file column_dict = dict() for column in db_table.columns: dtype = __get_dtype(column, column.type) column_dict[column.name] = dtype # Query the table result = db_table.select().execute() row_batch = result.fetchmany(size=batch_size) append = False while(len(row_batch) > 0): __write_parquet(output_path, row_batch, column_dict, write_index, compression, append) append = True row_batch = result.fetchmany(size=batch_size)