Last active
May 20, 2018 00:01
-
-
Save 8dspaces/b6bb9cea5a62c975e721ae596b7e51e3 to your computer and use it in GitHub Desktop.
xml/csv compare example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python2 | |
| import csv | |
| import sys | |
| from sys import stdout, stderr | |
| import os | |
| import argparse | |
| class RetainValueIterator(object): | |
| def __init__(self, f): | |
| self.f=f | |
| self.val=None | |
| def __iter__(self): | |
| return self | |
| def next(self): | |
| self.val=self.f.next() | |
| return self.val | |
| def compare(files, id_column, ignore_columns=None, verbose=False): | |
| """ | |
| Compare multiple csv files using a dict | |
| files - list of file names | |
| id_column - name of the column containing the unique ID | |
| ignore_columns - columns to ignore | |
| verbose - verbose indicator | |
| """ | |
| if not ignore_columns: | |
| ignore_columns=[] | |
| data=[] | |
| ignore_indexes=[] | |
| _last_header=None | |
| for f in files: | |
| _f=open(f, 'rb') | |
| _rv=RetainValueIterator(_f) | |
| _csv=csv.reader(_rv) | |
| _header=_csv.next() | |
| if _last_header and _header != _last_header: | |
| raise Exception('The header rows do not match') | |
| _last_header=_header | |
| # d[0]: filename | |
| # d[1]: csv_reader | |
| # d[2]: retain_value_iterator | |
| # d[3]: row_number | |
| # d[4]: completion_status | |
| # d[5]: dict | |
| data.append([f, _csv, _rv, 1, False, {},]) | |
| # ensure the id_column exists and get the column index | |
| if id_column not in _last_header: | |
| raise Exception('The id column specified does not exist') | |
| id_column_index=_last_header.index(id_column) | |
| # get the indexes for the ignore_columns | |
| try: | |
| ignore_indexes=[_last_header.index(a) for a in ignore_columns] | |
| except ValueError: | |
| raise Exception('One or more of the specified ignore columns does ' \ | |
| 'not exist') | |
| # identify hashable indexes | |
| hashable_indexes = \ | |
| [a for a in range(len(_last_header)) if a not in ignore_indexes] | |
| if verbose: | |
| stderr.write('Loading data from %d files\n' % (len(files),)) | |
| # write the mismatch header | |
| stdout.write( | |
| 'file,line,%s\n' % (','.join(_last_header)),) | |
| # default to first data | |
| current_index=0 | |
| # While any data has yet to complete | |
| while any(not d[4] for d in data): | |
| # keep in range | |
| if current_index > (len(data)-1): | |
| current_index=0 | |
| # find the next incomplete data | |
| while data[current_index][4]==True: | |
| current_index+=1 | |
| # set the current data | |
| d=data[current_index] | |
| try: | |
| # next row | |
| row=d[1].next() | |
| # get the raw csv from the retain value iterator | |
| csv_text=d[2].val | |
| # increment row count | |
| d[3]+=1 | |
| if len(row): | |
| ident=row[id_column_index] | |
| hashed=hash(''.join(row[x] for x in hashable_indexes)) | |
| # determine if id exists in the other data dicts | |
| found_in_all=True | |
| for od in [x for x in data if x != d]: | |
| if ident not in od[5]: | |
| found_in_all=False | |
| break | |
| if found_in_all: # id exists in all other data dicts | |
| # determine whether they all match | |
| all_match=True | |
| for od in [x for x in data if x != d]: | |
| if od[5][ident][0]!=hashed: | |
| all_match=False | |
| break | |
| # if the data did not match, print results | |
| if not all_match: | |
| # iterate thru each data | |
| for od in [x for x in data if x != d]: | |
| stdout.write('%s,%s,%s' % \ | |
| (od[0], od[5][ident][1], od[5][ident][2])) | |
| stdout.write('%s,%s,%s' % \ | |
| (d[0], d[3], csv_text)) | |
| # iterate thru each data and delete the items | |
| for od in [x for x in data if x != d]: | |
| del od[5][ident] | |
| else: # id does not yet exists in all other dicts | |
| d[5][ident]=(hashed, d[3], csv_text) | |
| # file switching every 100,000 rows | |
| if d[3] % 100000 == 0: | |
| current_index+=1 | |
| except StopIteration: | |
| # set completion status and increment data index | |
| d[4]=True | |
| current_index+=1 | |
| if verbose: | |
| stderr.write('Scanning for unique IDs\n') | |
| # Find Rows that are not in ALL files. | |
| stderr.write('%s,%s' % (id_column, ','.join([d[0] for d in data]))) | |
| if len(files) > 2: | |
| stderr.write(',consistency') | |
| stderr.write('\n') | |
| # create a set of all remaining IDs | |
| id_set=set().union(*[d[5].keys() for d in data]) | |
| # iterate the set | |
| for id in id_set: | |
| if len(files) > 2: | |
| _last_hash=None | |
| matched=True | |
| stderr.write('%s' % (id,)) | |
| for d in data: | |
| if id in d[5]: | |
| stderr.write(',True') | |
| if len(files) > 2: | |
| # check last hash | |
| if _last_hash and d[5][id][0]!= _last_hash: | |
| matched=False | |
| _last_hash=d[5][id][0] | |
| else: | |
| stderr.write(',False') | |
| if len(files) > 2: | |
| if matched: | |
| stderr.write(',consistent') | |
| else: | |
| stderr.write(',mismatched') | |
| stderr.write('\n') | |
| def main(argv=None): | |
| if argv is None: | |
| argv = sys.argv | |
| parser=argparse.ArgumentParser(description='compare two csv files') | |
| parser.add_argument('files', help='file to compare', | |
| metavar='file', nargs='+') | |
| parser.add_argument('identity_col', metavar='identity_column', | |
| help='identity column name') | |
| parser.add_argument('--ignore', help='columns to ignore', | |
| nargs='+', default=[]) | |
| parser.add_argument('--verbose', '-v', help='verbose indicator', | |
| action='store_true', default=False) | |
| args=parser.parse_args(argv[1:]) | |
| files=args.files | |
| identity_col=[] or args.identity_col | |
| ignore=args.ignore | |
| verbose=args.verbose | |
| compare(files, identity_col, ignore_columns=ignore, verbose=verbose) | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import unittest | |
| import xmltodict | |
| class XmlDiff(object): | |
| def __init__(self, xml1, xml2): | |
| self.dict1 = json.loads(json.dumps((xmltodict.parse(xml1)))) | |
| self.dict2 = json.loads(json.dumps((xmltodict.parse(xml2)))) | |
| @staticmethod | |
| def ignore_in_dict(dict1): | |
| for i in dict1.values(): | |
| if i == "%%IGNORE%%": | |
| return True | |
| return False | |
| def remove_ignore_field(self, dict_returned, dict_expected): | |
| for expected_keys, expected_values in dict_expected.items(): | |
| if isinstance(expected_values, dict): | |
| if self.ignore_in_dict(expected_values): | |
| del dict_expected[expected_keys] | |
| del dict_returned[expected_keys] | |
| if expected_keys in dict_returned: | |
| self.remove_ignore_field(expected_values, dict_returned[expected_keys]) | |
| elif type(expected_values) is list: | |
| list1 = [] | |
| list2 = [] | |
| for i, e in enumerate(expected_values): | |
| if not self.ignore_in_dict(e): | |
| list1.append(dict_expected[expected_keys][i]) | |
| list2.append(dict_returned[expected_keys][i]) | |
| dict_expected[expected_keys] = list1 | |
| dict_returned[expected_keys] = list2 | |
| return dict_expected, dict_returned | |
| def equal(self): | |
| self.remove_ignore_field(self.dict1, self.dict2) | |
| return self.dict1 == self.dict2 | |
| class XMLDiffTestCase(unittest.TestCase): | |
| def test_xml_equal(self): | |
| xml1 = """<a></a>""" | |
| xml2 = """<a></a>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_xml_are_not_equal(self): | |
| xml1 = "<a></a>" | |
| xml2 = "<b></b>" | |
| self.assertFalse(XmlDiff(xml1, xml2).equal()) | |
| def test_parameter_order_doesnt_matter(self): | |
| xml1 = """<a p1="1" p2="2"></a>""" | |
| xml2 = """<a p2="2" p1="1"></a>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_newline_doesnt_matter(self): | |
| xml1 = """<a></a>""" | |
| xml2 = """<a> | |
| </a>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_nested_tag(self): | |
| xml1 = """<a><b></b></a>""" | |
| xml2 = """ | |
| <a> | |
| <b></b> | |
| </a>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_nested_tag_not_equal(self): | |
| xml1 = """<a><b></b><c><d></d></c><e></e></a>""" | |
| xml2 = """ | |
| <a> | |
| <b></b> | |
| </a> | |
| """ | |
| self.assertFalse(XmlDiff(xml1, xml2).equal()) | |
| def test_nested_tag_parameter_not_equal(self): | |
| xml1 = """<t t="r" v="1"><f><p k="a" v="2"/></f></t>""" | |
| xml2 = """ | |
| <t t="r" v="1"> | |
| <f> | |
| <p k="a" v="1"/> | |
| </f> | |
| </t>""" | |
| self.assertFalse(XmlDiff(xml1, xml2).equal()) | |
| def test_double_tag_equal(self): | |
| xml1 = """<t t="r" v="1"><f><p k="a" v="1"/><p k="b" v="2"/></f></t>""" | |
| xml2 = """ | |
| <t t="r" v="1"> | |
| <f> | |
| <p k="a" v="1"/> | |
| <p k="b" v="2"/> | |
| </f> | |
| </t>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_xml_equal_with_encoding(self): | |
| xml1 = """<?xml version='1.0' encoding='utf-8' standalone='yes'?> | |
| <Stats start="1275955200" end="1276041599"> | |
| </Stats>""" | |
| xml2 = """<?xml version='1.0' encoding='utf-8' standalone='yes'?> | |
| <Stats end="1276041599" start="1275955200" > | |
| </Stats>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_ignore_field(self): | |
| xml1 = """<t t="r" v="1"><f><p k="a" v="1"/><p k="b" v="2"/></f></t>""" | |
| xml2 = """ | |
| <t t="r" v="1"> | |
| <f> | |
| <p k="a" v="1"/> | |
| <p k="b" v="%%IGNORE%%"/> | |
| </f> | |
| </t>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_ignore_one_field(self): | |
| xml1 = """<t t="r" v="1"><f><p k="b" v="2"/></f></t>""" | |
| xml2 = """ | |
| <t t="r" v="1"> | |
| <f> | |
| <p k="b" v="%%IGNORE%%"/> | |
| </f> | |
| </t>""" | |
| self.assertTrue(XmlDiff(xml1, xml2).equal()) | |
| def test_ignore_three_fields(self): | |
| xml1 = """<?xml version="1.0" ?><trophy type="result" version="1.0"> | |
| <status code="0" message="ok" /> | |
| <version> | |
| <parameter key="version" value="%%IGNORE%%" /> | |
| <parameter key="build_date" value="%%IGNORE%%" /> | |
| <parameter key="type" value="%%IGNORE%%" /> | |
| </version> | |
| </trophy>""" | |
| xml2 = """<?xml version="1.0" ?> | |
| <trophy type="result" version="1.0"> | |
| <status code="0" message="ok"/> | |
| <version> | |
| <parameter key="version" value="4.2.1.0"/> | |
| <parameter key="build_date" value="10/09/2015"/> | |
| <parameter key="type" value="Lite"/> | |
| </version> | |
| </trophy> | |
| """ | |
| self.assertTrue(XmlDiff(xml2, xml1).equal()) | |
| def test_ingore_in_dict(self): | |
| dict1 = {u'@k': u'b', u'@v': u'%%IGNORE%%'} | |
| self.assertTrue(XmlDiff.ignore_in_dict(dict1)) | |
| dict2 = {u'@k': u'b', u'@v': u'value'} | |
| self.assertFalse(XmlDiff.ignore_in_dict(dict2)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try: | |
| import doctest | |
| doctest.OutputChecker | |
| except AttributeError: # Python < 2.4 | |
| import util.doctest24 as doctest | |
| try: | |
| import xml.etree.ElementTree as ET | |
| except ImportError: | |
| import elementtree.ElementTree as ET | |
| from xml.parsers.expat import ExpatError as XMLParseError | |
| RealOutputChecker = doctest.OutputChecker | |
| def debug(*msg): | |
| import sys | |
| print >> sys.stderr, ' '.join(map(str, msg)) | |
| class HTMLOutputChecker(RealOutputChecker): | |
| def check_output(self, want, got, optionflags): | |
| normal = RealOutputChecker.check_output(self, want, got, optionflags) | |
| if normal or not got: | |
| return normal | |
| try: | |
| want_xml = make_xml(want) | |
| except XMLParseError: | |
| pass | |
| else: | |
| try: | |
| got_xml = make_xml(got) | |
| except XMLParseError: | |
| pass | |
| else: | |
| if xml_compare(want_xml, got_xml): | |
| return True | |
| return False | |
| def output_difference(self, example, got, optionflags): | |
| actual = RealOutputChecker.output_difference( | |
| self, example, got, optionflags) | |
| want_xml = got_xml = None | |
| try: | |
| want_xml = make_xml(example.want) | |
| want_norm = make_string(want_xml) | |
| except XMLParseError, e: | |
| if example.want.startswith('<'): | |
| want_norm = '(bad XML: %s)' % e | |
| # '<xml>%s</xml>' % example.want | |
| else: | |
| return actual | |
| try: | |
| got_xml = make_xml(got) | |
| got_norm = make_string(got_xml) | |
| except XMLParseError, e: | |
| if example.want.startswith('<'): | |
| got_norm = '(bad XML: %s)' % e | |
| else: | |
| return actual | |
| s = '%s\nXML Wanted: %s\nXML Got : %s\n' % ( | |
| actual, want_norm, got_norm) | |
| if got_xml and want_xml: | |
| result = [] | |
| xml_compare(want_xml, got_xml, result.append) | |
| s += 'Difference report:\n%s\n' % '\n'.join(result) | |
| return s | |
| def xml_compare(x1, x2, reporter=None): | |
| if x1.tag != x2.tag: | |
| if reporter: | |
| reporter('Tags do not match: %s and %s' % (x1.tag, x2.tag)) | |
| return False | |
| for name, value in x1.attrib.items(): | |
| if x2.attrib.get(name) != value: | |
| if reporter: | |
| reporter('Attributes do not match: %s=%r, %s=%r' | |
| % (name, value, name, x2.attrib.get(name))) | |
| return False | |
| for name in x2.attrib.keys(): | |
| if name not in x1.attrib: | |
| if reporter: | |
| reporter('x2 has an attribute x1 is missing: %s' | |
| % name) | |
| return False | |
| if not text_compare(x1.text, x2.text): | |
| if reporter: | |
| reporter('text: %r != %r' % (x1.text, x2.text)) | |
| return False | |
| if not text_compare(x1.tail, x2.tail): | |
| if reporter: | |
| reporter('tail: %r != %r' % (x1.tail, x2.tail)) | |
| return False | |
| cl1 = x1.getchildren() | |
| cl2 = x2.getchildren() | |
| if len(cl1) != len(cl2): | |
| if reporter: | |
| reporter('children length differs, %i != %i' | |
| % (len(cl1), len(cl2))) | |
| return False | |
| i = 0 | |
| for c1, c2 in zip(cl1, cl2): | |
| i += 1 | |
| if not xml_compare(c1, c2, reporter=reporter): | |
| if reporter: | |
| reporter('children %i do not match: %s' | |
| % (i, c1.tag)) | |
| return False | |
| return True | |
| def text_compare(t1, t2): | |
| if not t1 and not t2: | |
| return True | |
| if t1 == '*' or t2 == '*': | |
| return True | |
| return (t1 or '').strip() == (t2 or '').strip() | |
| def make_xml(s): | |
| return ET.XML('<xml>%s</xml>' % s) | |
| def make_string(xml): | |
| if isinstance(xml, (str, unicode)): | |
| xml = make_xml(xml) | |
| s = ET.tostring(xml) | |
| if s == '<xml />': | |
| return '' | |
| assert s.startswith('<xml>') and s.endswith('</xml>'), repr(s) | |
| return s[5:-6] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment