Skip to content

Instantly share code, notes, and snippets.

@jwestarb
Created August 23, 2019 08:01
Show Gist options
  • Save jwestarb/39eeacb6dff08494cf4ba64bc182d9a4 to your computer and use it in GitHub Desktop.
Save jwestarb/39eeacb6dff08494cf4ba64bc182d9a4 to your computer and use it in GitHub Desktop.
Script python load DBF file do Oracle Table
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
dbf2oracle - convert dbf files into Oracle database
Todo:
- -v --verbose option
- handle existing table (-f option?)
- primary key option? (make first column primary key)
- create only option?
- insert only option?
- options to select columns to insert?
"""
import os
import sys
import argparse
import traceback
from dbfread import DBF
try:
import cx_Oracle
except (ImportError, e):
raise ImportError(
str(e) + """ Was not found. Make sure you have it installed""")
username = 'DbUser'
password = 'secret'
sid = 'ORCL'
typemap = {
'F': 'NUMBER',
'L': 'VARCHAR2',
'I': 'INTEGER',
'C': 'VARCHAR2',
'N': 'NUMBER', # because it can be integer or float
'M': 'VARCHAR2',
'D': 'DATE',
'T': 'DATE',
'0': 'INTEGER',
}
def add_table(cursor, table):
"""Add a dbase table to an open Oracle database."""
# cursor.execute('drop table if exists %s' % table.name)
field_types = {}
for f in table.fields:
field_types[f.name] = typemap.get(f.type, 'VARCHAR2')
#
# Create the table
#
defs = ', '.join(['%s %s' % (f, field_types[f])
for f in table.field_names])
sql = 'create table "%s" (%s)' % (table.name, defs)
# print(sql)
cursor.execute(sql)
# Create data rows
refs = ', '.join([':' + f for f in table.field_names])
sql = 'insert into %s values (%s)' % (table.name, refs)
print(sql)
i = 0
for rec in table:
i = i+1
# print(list(rec.values()))
cursor.execute(sql, list(rec.values()))
if i == 5000:
print("5000 registros. Commit....")
i = 0
cursor.connection.commit()
def parse_args():
parser = argparse.ArgumentParser(
description='usage: %prog [OPTIONS] table1.dbf ... tableN.dbf')
arg = parser.add_argument
arg('-e', '--encoding',
action='store',
dest='encoding',
default=None,
help='character encoding in DBF file')
arg('--char-decode-errors',
action='store',
dest='char_decode_errors',
default='strict',
help='how to handle decode errors (see pydoc bytes.decode)')
arg('tables',
metavar='TABLE',
nargs='+',
help='tables to add to Oracle database')
return parser.parse_args()
def main():
args = parse_args()
conn = cx_Oracle.connect(username, password, sid, encoding="cp850")
cursor = conn.cursor()
for table_file in args.tables:
try:
add_table(cursor, DBF(table_file,
lowernames=True,
encoding=args.encoding,
char_decode_errors=args.char_decode_errors))
except UnicodeDecodeError as err:
traceback.print_exc()
sys.exit('Please use --encoding or --char-decode-errors.')
conn.commit()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment