-
-
Save sebroad/e6a9feb3fad24dccda3e30ffc2e8650c to your computer and use it in GitHub Desktop.
Convert XML files into SQLite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| .vscode/launch.json | |
| xmlsqlite.db3 | |
| .vscode/settings.json |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # https://docs.python.org/3.6/library/xml.etree.elementtree.html#xpath-support | |
| from xml.etree import ElementTree, ElementPath | |
| import os | |
| from datetime import datetime | |
| from shutil import copyfile | |
| class PlexosElement: | |
| def __init__(self, element): | |
| self.source = element | |
| self.load_element() | |
| def load_element(self): | |
| pass | |
| def get_attribute(self, attr_name): | |
| try: | |
| return self.source.find('{http://tempuri.org/MasterDataSet.xsd}' + attr_name).text | |
| except: | |
| return '' | |
| def __str__(self): | |
| return '\n'.join(['{}-->{}'.format(k,self.__dict__[k]) for k in self.__dict__.keys()]) | |
| class PlexosClass(PlexosElement): | |
| def load_element(self): | |
| self.class_id = self.get_attribute('class_id') | |
| self.name = self.get_attribute('name') | |
| self.class_group_id = self.get_attribute('class_group_id') | |
| self.is_enabled = self.get_attribute('is_enabled') | |
| self.lang_id = self.get_attribute('lang_id') | |
| self.description = self.get_attribute('description') | |
| class PlexosObject(PlexosElement): | |
| def load_element(self): | |
| self.object_id = self.get_attribute('object_id') | |
| self.class_id = self.get_attribute('class_id') | |
| self.name = self.get_attribute('name') | |
| self.category_id = self.get_attribute('category_id') | |
| self.description = self.get_attribute('description') | |
| self.guid = self.get_attribute('GUID') | |
| class PlexosMembership(PlexosElement): | |
| def load_element(self): | |
| self.membership_id = self.get_attribute('membership_id') | |
| self.parent_class_id = self.get_attribute('parent_class_id') | |
| self.parent_object_id = self.get_attribute('parent_object_id') | |
| self.collection_id = self.get_attribute('collection_id') | |
| self.child_class_id = self.get_attribute('child_class_id') | |
| self.child_object_id = self.get_attribute('child_object_id') | |
| class PlexosCollection(PlexosElement): | |
| def load_element(self): | |
| self.collection_id = self.get_attribute('collection_id') | |
| self.parent_class_id = self.get_attribute('parent_class_id') | |
| self.child_class_id = self.get_attribute('child_class_id') | |
| self.name = self.get_attribute('name') | |
| self.min_count = self.get_attribute('min_count') | |
| self.max_count = self.get_attribute('max_count') | |
| self.is_enabled = self.get_attribute('is_enabled') | |
| self.is_one_to_many = self.get_attribute('is_one_to_many') | |
| self.lang_id = self.get_attribute('lang_id') | |
| self.description = self.get_attribute('description') | |
| tree = ElementTree.parse("C:\\Users\\steven.broad\\Documents\\GitHub\\Python-PLEXOS-API\\Input Files\\rts_PLEXOS.xml") | |
| plexos_classes = dict() | |
| plexos_collections = dict() | |
| plexos_lines = dict() | |
| plexos_nodes = dict() | |
| plexos_memberships = dict() | |
| for x in tree.findall('./{http://tempuri.org/MasterDataSet.xsd}t_class'): | |
| obj = PlexosClass(x) | |
| plexos_classes[obj.class_id] = obj | |
| plexos_classes[obj.name] = obj | |
| line_class_id = plexos_classes['Line'].class_id # get class_id for lines | |
| node_class_id = plexos_classes['Node'].class_id # get class_id for lines | |
| for x in tree.findall('./{http://tempuri.org/MasterDataSet.xsd}t_collection'): | |
| obj = PlexosCollection(x) | |
| plexos_collections[obj.collection_id] = obj | |
| plexos_collections[obj.name] = obj | |
| for x in tree.findall("./{http://tempuri.org/MasterDataSet.xsd}t_object/[{http://tempuri.org/MasterDataSet.xsd}class_id='" + line_class_id + "']"): | |
| line_obj = PlexosObject(x) | |
| plexos_lines[line_obj.object_id] = line_obj | |
| for x in tree.findall("./{http://tempuri.org/MasterDataSet.xsd}t_object/[{http://tempuri.org/MasterDataSet.xsd}class_id='" + node_class_id + "']"): | |
| node_obj = PlexosObject(x) | |
| plexos_nodes[node_obj.object_id] = node_obj | |
| plexos_nodes[node_obj.name] = node_obj | |
| query = "./{http://tempuri.org/MasterDataSet.xsd}t_membership/[{http://tempuri.org/MasterDataSet.xsd}parent_class_id='"+line_class_id+"'][{http://tempuri.org/MasterDataSet.xsd}child_class_id='"+node_class_id+"']" | |
| for x in tree.findall(query): | |
| obj = PlexosMembership(x) | |
| if plexos_collections[obj.collection_id].name == 'Node From': | |
| plexos_lines[obj.parent_object_id].from_node = plexos_nodes[obj.child_object_id].name | |
| elif plexos_collections[obj.collection_id].name == 'Node To': | |
| plexos_lines[obj.parent_object_id].to_node = plexos_nodes[obj.child_object_id].name | |
| for k in plexos_lines.keys(): | |
| try: | |
| print(plexos_lines[k].name, '::', plexos_lines[k].from_node, '--->', plexos_lines[k].to_node) | |
| except: | |
| continue | |
| # Python .NET interface | |
| from dotnet.seamless import add_assemblies, load_assembly#, build_assembly | |
| # load PLEXOS assemblies | |
| plexos_path = 'C:/Program Files (x86)/Energy Exemplar/PLEXOS 7.5/' | |
| add_assemblies(plexos_path) | |
| load_assembly('PLEXOS7_NET.Core') | |
| load_assembly('EEUTILITY') | |
| # .NET related imports | |
| from PLEXOS7_NET.Core import DatabaseCore | |
| from EEUTILITY.Enums import * | |
| from System import * | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from optparse import OptionParser | |
| import sqlite3, os | |
| import pandas as pd | |
| def run(inputDir, outputFile, overwrite = True): | |
| files = [] | |
| if os.path.exists(inputDir) and os.path.isfile(inputDir): | |
| files.append(inputDir) | |
| else: | |
| for filename in os.listdir(inputDir): | |
| if '.xlsx' in filename and 'PLEXOS Export' in filename: | |
| files.append(os.path.join(inputDir, filename)) | |
| if os.path.exists(outputFile): | |
| os.remove(outputFile) | |
| con=sqlite3.connect(outputFile) | |
| sheets = ['Objects', 'Categories', 'Memberships', 'CustomColumns', 'Attributes', 'Properties', 'Reports'] | |
| for filename in files: | |
| for sheet in sheets: | |
| try: | |
| wb=pd.read_excel(filename,sheet_name=sheet) | |
| except: | |
| continue | |
| wb.to_sql(sheet, con, index=False) | |
| con.commit() | |
| con.close() | |
| def main(): | |
| usage = "usage: %prog [options] /path/to/dir/with/xml" | |
| parser = OptionParser(usage) | |
| parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3', help="Specify the filename for the sqlite database. It will be created if it does not exist [Default: xmlsqlite.db3]") | |
| (options, args) = parser.parse_args() | |
| if len(args) != 1: | |
| parser.error("incorrect number of arguments") | |
| inputDir = os.path.abspath(os.path.expanduser(args[0])) | |
| run(inputDir, options.outputFile) | |
| if __name__ == "__main__": main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| from optparse import OptionParser | |
| from xml.dom.minidom import parse | |
| import os | |
| import sqlite3 | |
| datatypeMap = { | |
| 'integer': 'INT', | |
| 'datetime': 'DATETIME', | |
| 'boolean': 'BOOLEAN' | |
| } | |
| defaultDataType = 'TEXT' | |
| def get_xml_doms(directory): | |
| result = [] | |
| for filename in directory: | |
| if filename.endswith('.xml'): | |
| dom = parse(filename) | |
| result.append(dom) | |
| return result | |
| def yield_db_schema(dbDef): | |
| result = '' | |
| for (table, tableDef) in dbDef.items(): | |
| result += create_table(table, tableDef) | |
| return result | |
| def exec_create_schema(dbDef, conn, db): | |
| for (table, tableDef) in dbDef.items(): | |
| create = create_table(table, tableDef) | |
| db.execute(create) | |
| def yield_inserts(recordSet): | |
| inserts = '' | |
| for (table, rows) in recordSet.items(): | |
| for row in rows: | |
| fields = "\'" + '\', \''.join(row.keys()) + "\'" | |
| data = "\'" + '\', \''.join(row.values()) + "\'" | |
| if fields != "''": | |
| inserts += "INSERT INTO \'%s\' (%s) VALUES (%s);\n" % (table, fields, data) | |
| return inserts | |
| def exec_insert(recordSet, conn, db): | |
| for (table, rows) in recordSet.items(): | |
| for row in rows: | |
| fields = "\'" + '\', \''.join(row.keys()) + "\'" | |
| data = "\'" + '\', \''.join(row.values()) + "\'" | |
| if len(row.keys()) >0: | |
| marklist = ["?"] * len(row.keys()) | |
| marks = ', '.join(marklist) | |
| insert = "INSERT INTO \'%s\' (%s) VALUES (%s)" % (table, fields, marks) | |
| values = tuple(row.values()) | |
| db.execute(insert, values) | |
| conn.commit() | |
| def create_table(table, tableDef): | |
| fields = [] | |
| begin = 'CREATE TABLE \'%s\' ( \n' % table | |
| for field, fieldDef in tableDef.items(): | |
| fields.append(create_field(field, fieldDef)) | |
| end = '\n);\n\n' | |
| result = begin + ',\n'.join(fields) + end | |
| return result | |
| def create_field(field, fieldDef): | |
| if fieldDef.has_key(u'type'): | |
| datatype = fieldDef.get(u'type') | |
| else: | |
| datatype = defaultDataType | |
| return " '%s' %s" % (field, datatype) | |
| def collect_structure(doms): | |
| db = {} | |
| records = {} | |
| for dom in doms: | |
| db = gen_db_struct(dom.childNodes, db) | |
| return db | |
| def collect_data(dbDef, doms): | |
| recordset = {} | |
| for dom in doms: | |
| for (table, fieldDef) in dbDef.items(): | |
| if not recordset.has_key(table): | |
| recordset[table] = [] | |
| for row in dom.getElementsByTagName(table): | |
| record = {} | |
| for (column, _) in fieldDef.items(): | |
| for node in row.getElementsByTagName(column): | |
| if node.hasChildNodes(): | |
| for item in node.childNodes: | |
| if hasattr(item, 'data'): | |
| if len(item.data.strip()) > 0: | |
| record[column] = item.data | |
| recordset[table].append(record) | |
| return recordset | |
| def gen_db_struct(nodeList, db = {}): | |
| for node in nodeList: | |
| if not node.hasChildNodes() and node.parentNode.parentNode != None and node.parentNode.parentNode.nodeName != '#document': | |
| # a new field of data | |
| field = node.parentNode | |
| fieldName = field.nodeName | |
| table = field.parentNode | |
| tableName = table.nodeName | |
| if not tableName in db: | |
| db[tableName] = {} | |
| db[tableName][fieldName] = {} | |
| if field.hasAttributes(): | |
| for (Key, Value) in field.attributes.items(): | |
| if Key != u'type' and Value != u'array': | |
| db[tableName][fieldName][Key] = datatypeMap[Value] | |
| else: | |
| gen_db_struct(node.childNodes, db) | |
| return db | |
| def run(inputDir, outputFile): | |
| files = [] | |
| if os.path.exists(inputDir): | |
| files.append(inputDir) | |
| else: | |
| for filename in os.listdir(inputDir): | |
| files.append(os.path.join(inputDir, filename)) | |
| domList = get_xml_doms(files) | |
| dbDef = collect_structure(domList) | |
| records = collect_data(dbDef, domList) | |
| conn = sqlite3.connect(outputFile) | |
| db = conn.cursor() | |
| exec_create_schema(dbDef, conn, db) | |
| exec_insert(records, conn, db) | |
| db.close() | |
| def main(): | |
| usage = "usage: %prog [options] /path/to/dir/with/xml" | |
| parser = OptionParser(usage) | |
| parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3', | |
| help="Specify the filename for the sqlite database. It will be created if it does not exist [Default: xmlsqlite.db3]") | |
| (options, args) = parser.parse_args() | |
| if len(args) != 1: | |
| parser.error("incorrect number of arguments") | |
| inputDir = os.path.abspath(os.path.expanduser(args[0])) | |
| run(inputDir, options.outputFile) | |
| if __name__ == "__main__": main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment