Skip to content

Instantly share code, notes, and snippets.

@sebroad
Forked from roder/xmlsqlite.py
Last active May 27, 2022 11:41
Show Gist options
  • Save sebroad/e6a9feb3fad24dccda3e30ffc2e8650c to your computer and use it in GitHub Desktop.
Save sebroad/e6a9feb3fad24dccda3e30ffc2e8650c to your computer and use it in GitHub Desktop.
Convert XML files into SQLite
.vscode/launch.json
xmlsqlite.db3
.vscode/settings.json
# https://docs.python.org/3.6/library/xml.etree.elementtree.html#xpath-support
from xml.etree import ElementTree, ElementPath
import os
from datetime import datetime
from shutil import copyfile
class PlexosElement:
def __init__(self, element):
self.source = element
self.load_element()
def load_element(self):
pass
def get_attribute(self, attr_name):
try:
return self.source.find('{http://tempuri.org/MasterDataSet.xsd}' + attr_name).text
except:
return ''
def __str__(self):
return '\n'.join(['{}-->{}'.format(k,self.__dict__[k]) for k in self.__dict__.keys()])
class PlexosClass(PlexosElement):
def load_element(self):
self.class_id = self.get_attribute('class_id')
self.name = self.get_attribute('name')
self.class_group_id = self.get_attribute('class_group_id')
self.is_enabled = self.get_attribute('is_enabled')
self.lang_id = self.get_attribute('lang_id')
self.description = self.get_attribute('description')
class PlexosObject(PlexosElement):
def load_element(self):
self.object_id = self.get_attribute('object_id')
self.class_id = self.get_attribute('class_id')
self.name = self.get_attribute('name')
self.category_id = self.get_attribute('category_id')
self.description = self.get_attribute('description')
self.guid = self.get_attribute('GUID')
class PlexosMembership(PlexosElement):
def load_element(self):
self.membership_id = self.get_attribute('membership_id')
self.parent_class_id = self.get_attribute('parent_class_id')
self.parent_object_id = self.get_attribute('parent_object_id')
self.collection_id = self.get_attribute('collection_id')
self.child_class_id = self.get_attribute('child_class_id')
self.child_object_id = self.get_attribute('child_object_id')
class PlexosCollection(PlexosElement):
def load_element(self):
self.collection_id = self.get_attribute('collection_id')
self.parent_class_id = self.get_attribute('parent_class_id')
self.child_class_id = self.get_attribute('child_class_id')
self.name = self.get_attribute('name')
self.min_count = self.get_attribute('min_count')
self.max_count = self.get_attribute('max_count')
self.is_enabled = self.get_attribute('is_enabled')
self.is_one_to_many = self.get_attribute('is_one_to_many')
self.lang_id = self.get_attribute('lang_id')
self.description = self.get_attribute('description')
tree = ElementTree.parse("C:\\Users\\steven.broad\\Documents\\GitHub\\Python-PLEXOS-API\\Input Files\\rts_PLEXOS.xml")
plexos_classes = dict()
plexos_collections = dict()
plexos_lines = dict()
plexos_nodes = dict()
plexos_memberships = dict()
for x in tree.findall('./{http://tempuri.org/MasterDataSet.xsd}t_class'):
obj = PlexosClass(x)
plexos_classes[obj.class_id] = obj
plexos_classes[obj.name] = obj
line_class_id = plexos_classes['Line'].class_id # get class_id for lines
node_class_id = plexos_classes['Node'].class_id # get class_id for lines
for x in tree.findall('./{http://tempuri.org/MasterDataSet.xsd}t_collection'):
obj = PlexosCollection(x)
plexos_collections[obj.collection_id] = obj
plexos_collections[obj.name] = obj
for x in tree.findall("./{http://tempuri.org/MasterDataSet.xsd}t_object/[{http://tempuri.org/MasterDataSet.xsd}class_id='" + line_class_id + "']"):
line_obj = PlexosObject(x)
plexos_lines[line_obj.object_id] = line_obj
for x in tree.findall("./{http://tempuri.org/MasterDataSet.xsd}t_object/[{http://tempuri.org/MasterDataSet.xsd}class_id='" + node_class_id + "']"):
node_obj = PlexosObject(x)
plexos_nodes[node_obj.object_id] = node_obj
plexos_nodes[node_obj.name] = node_obj
query = "./{http://tempuri.org/MasterDataSet.xsd}t_membership/[{http://tempuri.org/MasterDataSet.xsd}parent_class_id='"+line_class_id+"'][{http://tempuri.org/MasterDataSet.xsd}child_class_id='"+node_class_id+"']"
for x in tree.findall(query):
obj = PlexosMembership(x)
if plexos_collections[obj.collection_id].name == 'Node From':
plexos_lines[obj.parent_object_id].from_node = plexos_nodes[obj.child_object_id].name
elif plexos_collections[obj.collection_id].name == 'Node To':
plexos_lines[obj.parent_object_id].to_node = plexos_nodes[obj.child_object_id].name
for k in plexos_lines.keys():
try:
print(plexos_lines[k].name, '::', plexos_lines[k].from_node, '--->', plexos_lines[k].to_node)
except:
continue
# Python .NET interface
from dotnet.seamless import add_assemblies, load_assembly#, build_assembly
# load PLEXOS assemblies
plexos_path = 'C:/Program Files (x86)/Energy Exemplar/PLEXOS 7.5/'
add_assemblies(plexos_path)
load_assembly('PLEXOS7_NET.Core')
load_assembly('EEUTILITY')
# .NET related imports
from PLEXOS7_NET.Core import DatabaseCore
from EEUTILITY.Enums import *
from System import *
from optparse import OptionParser
import sqlite3, os
import pandas as pd
def run(inputDir, outputFile, overwrite = True):
files = []
if os.path.exists(inputDir) and os.path.isfile(inputDir):
files.append(inputDir)
else:
for filename in os.listdir(inputDir):
if '.xlsx' in filename and 'PLEXOS Export' in filename:
files.append(os.path.join(inputDir, filename))
if os.path.exists(outputFile):
os.remove(outputFile)
con=sqlite3.connect(outputFile)
sheets = ['Objects', 'Categories', 'Memberships', 'CustomColumns', 'Attributes', 'Properties', 'Reports']
for filename in files:
for sheet in sheets:
try:
wb=pd.read_excel(filename,sheet_name=sheet)
except:
continue
wb.to_sql(sheet, con, index=False)
con.commit()
con.close()
def main():
usage = "usage: %prog [options] /path/to/dir/with/xml"
parser = OptionParser(usage)
parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3', help="Specify the filename for the sqlite database. It will be created if it does not exist [Default: xmlsqlite.db3]")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("incorrect number of arguments")
inputDir = os.path.abspath(os.path.expanduser(args[0]))
run(inputDir, options.outputFile)
if __name__ == "__main__": main()
#!/usr/bin/env python
from optparse import OptionParser
from xml.dom.minidom import parse
import os
import sqlite3
datatypeMap = {
'integer': 'INT',
'datetime': 'DATETIME',
'boolean': 'BOOLEAN'
}
defaultDataType = 'TEXT'
def get_xml_doms(directory):
result = []
for filename in directory:
if filename.endswith('.xml'):
dom = parse(filename)
result.append(dom)
return result
def yield_db_schema(dbDef):
result = ''
for (table, tableDef) in dbDef.items():
result += create_table(table, tableDef)
return result
def exec_create_schema(dbDef, conn, db):
for (table, tableDef) in dbDef.items():
create = create_table(table, tableDef)
db.execute(create)
def yield_inserts(recordSet):
inserts = ''
for (table, rows) in recordSet.items():
for row in rows:
fields = "\'" + '\', \''.join(row.keys()) + "\'"
data = "\'" + '\', \''.join(row.values()) + "\'"
if fields != "''":
inserts += "INSERT INTO \'%s\' (%s) VALUES (%s);\n" % (table, fields, data)
return inserts
def exec_insert(recordSet, conn, db):
for (table, rows) in recordSet.items():
for row in rows:
fields = "\'" + '\', \''.join(row.keys()) + "\'"
data = "\'" + '\', \''.join(row.values()) + "\'"
if len(row.keys()) >0:
marklist = ["?"] * len(row.keys())
marks = ', '.join(marklist)
insert = "INSERT INTO \'%s\' (%s) VALUES (%s)" % (table, fields, marks)
values = tuple(row.values())
db.execute(insert, values)
conn.commit()
def create_table(table, tableDef):
fields = []
begin = 'CREATE TABLE \'%s\' ( \n' % table
for field, fieldDef in tableDef.items():
fields.append(create_field(field, fieldDef))
end = '\n);\n\n'
result = begin + ',\n'.join(fields) + end
return result
def create_field(field, fieldDef):
if fieldDef.has_key(u'type'):
datatype = fieldDef.get(u'type')
else:
datatype = defaultDataType
return " '%s' %s" % (field, datatype)
def collect_structure(doms):
db = {}
records = {}
for dom in doms:
db = gen_db_struct(dom.childNodes, db)
return db
def collect_data(dbDef, doms):
recordset = {}
for dom in doms:
for (table, fieldDef) in dbDef.items():
if not recordset.has_key(table):
recordset[table] = []
for row in dom.getElementsByTagName(table):
record = {}
for (column, _) in fieldDef.items():
for node in row.getElementsByTagName(column):
if node.hasChildNodes():
for item in node.childNodes:
if hasattr(item, 'data'):
if len(item.data.strip()) > 0:
record[column] = item.data
recordset[table].append(record)
return recordset
def gen_db_struct(nodeList, db = {}):
for node in nodeList:
if not node.hasChildNodes() and node.parentNode.parentNode != None and node.parentNode.parentNode.nodeName != '#document':
# a new field of data
field = node.parentNode
fieldName = field.nodeName
table = field.parentNode
tableName = table.nodeName
if not tableName in db:
db[tableName] = {}
db[tableName][fieldName] = {}
if field.hasAttributes():
for (Key, Value) in field.attributes.items():
if Key != u'type' and Value != u'array':
db[tableName][fieldName][Key] = datatypeMap[Value]
else:
gen_db_struct(node.childNodes, db)
return db
def run(inputDir, outputFile):
files = []
if os.path.exists(inputDir):
files.append(inputDir)
else:
for filename in os.listdir(inputDir):
files.append(os.path.join(inputDir, filename))
domList = get_xml_doms(files)
dbDef = collect_structure(domList)
records = collect_data(dbDef, domList)
conn = sqlite3.connect(outputFile)
db = conn.cursor()
exec_create_schema(dbDef, conn, db)
exec_insert(records, conn, db)
db.close()
def main():
usage = "usage: %prog [options] /path/to/dir/with/xml"
parser = OptionParser(usage)
parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3',
help="Specify the filename for the sqlite database. It will be created if it does not exist [Default: xmlsqlite.db3]")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("incorrect number of arguments")
inputDir = os.path.abspath(os.path.expanduser(args[0]))
run(inputDir, options.outputFile)
if __name__ == "__main__": main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment