import tempfile import pandas.io.sql class PgSQLDatabase(pandas.io.sql.SQLDatabase): # FIXME Schema is pulled from Meta object, shouldn't actually be part of signature! def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None, pk=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame : DataFrame name : string Name of SQL table if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type. pk: name of column(s) to set as primary keys """ if dtype is not None: import sqlalchemy.sql.type_api as type_api for col, my_type in dtype.items(): if not issubclass(my_type, type_api.TypeEngine): raise ValueError('The type of %s is not a SQLAlchemy ' 'type ' % col) table = pandas.io.sql.SQLTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, schema=self.meta.schema, dtype=dtype) table.create() if pk is not None: if isinstance(pk, str): pks = pk else: pks = ", ".join(pk) sql = "ALTER TABLE {schema_name}.{table_name} ADD PRIMARY KEY ({pks})".format(schema_name=self.meta.schema, table_name=name, pks=pks) self.execute(sql) # Some tricks needed here: # Need to explicitly keep reference to connection # Need to "open" temp file seperately in write and read mode # Otherwise data does not get loaded conn = self.engine.raw_connection() with conn.cursor() as cur, tempfile.NamedTemporaryFile(mode='w') as temp_file: frame.to_csv(temp_file, index=index) with open(temp_file.name, 'r') as f: sql = "COPY {schema_name}.{table_name} FROM STDIN WITH (FORMAT CSV, HEADER TRUE)".format( schema_name=self.meta.schema, table_name=name) cur.copy_expert(sql, f) conn.commit() # check for potentially case sensitivity issues (GH7815) self.meta.reflect() if name not in self.engine.table_names(schema=schema or self.meta.schema): warnings.warn("The provided table name '{0}' is not found exactly " "as such in the database after writing the table, " "possibly due to case sensitivity issues. Consider " "using lower case table names.".format(name), UserWarning)