Skip to content

Instantly share code, notes, and snippets.

@8dspaces
Last active May 20, 2018 00:01
Show Gist options
  • Save 8dspaces/b6bb9cea5a62c975e721ae596b7e51e3 to your computer and use it in GitHub Desktop.
Save 8dspaces/b6bb9cea5a62c975e721ae596b7e51e3 to your computer and use it in GitHub Desktop.
xml/csv compare example
#!/usr/bin/env python2
import csv
import sys
from sys import stdout, stderr
import os
import argparse
class RetainValueIterator(object):
def __init__(self, f):
self.f=f
self.val=None
def __iter__(self):
return self
def next(self):
self.val=self.f.next()
return self.val
def compare(files, id_column, ignore_columns=None, verbose=False):
"""
Compare multiple csv files using a dict
files - list of file names
id_column - name of the column containing the unique ID
ignore_columns - columns to ignore
verbose - verbose indicator
"""
if not ignore_columns:
ignore_columns=[]
data=[]
ignore_indexes=[]
_last_header=None
for f in files:
_f=open(f, 'rb')
_rv=RetainValueIterator(_f)
_csv=csv.reader(_rv)
_header=_csv.next()
if _last_header and _header != _last_header:
raise Exception('The header rows do not match')
_last_header=_header
# d[0]: filename
# d[1]: csv_reader
# d[2]: retain_value_iterator
# d[3]: row_number
# d[4]: completion_status
# d[5]: dict
data.append([f, _csv, _rv, 1, False, {},])
# ensure the id_column exists and get the column index
if id_column not in _last_header:
raise Exception('The id column specified does not exist')
id_column_index=_last_header.index(id_column)
# get the indexes for the ignore_columns
try:
ignore_indexes=[_last_header.index(a) for a in ignore_columns]
except ValueError:
raise Exception('One or more of the specified ignore columns does ' \
'not exist')
# identify hashable indexes
hashable_indexes = \
[a for a in range(len(_last_header)) if a not in ignore_indexes]
if verbose:
stderr.write('Loading data from %d files\n' % (len(files),))
# write the mismatch header
stdout.write(
'file,line,%s\n' % (','.join(_last_header)),)
# default to first data
current_index=0
# While any data has yet to complete
while any(not d[4] for d in data):
# keep in range
if current_index > (len(data)-1):
current_index=0
# find the next incomplete data
while data[current_index][4]==True:
current_index+=1
# set the current data
d=data[current_index]
try:
# next row
row=d[1].next()
# get the raw csv from the retain value iterator
csv_text=d[2].val
# increment row count
d[3]+=1
if len(row):
ident=row[id_column_index]
hashed=hash(''.join(row[x] for x in hashable_indexes))
# determine if id exists in the other data dicts
found_in_all=True
for od in [x for x in data if x != d]:
if ident not in od[5]:
found_in_all=False
break
if found_in_all: # id exists in all other data dicts
# determine whether they all match
all_match=True
for od in [x for x in data if x != d]:
if od[5][ident][0]!=hashed:
all_match=False
break
# if the data did not match, print results
if not all_match:
# iterate thru each data
for od in [x for x in data if x != d]:
stdout.write('%s,%s,%s' % \
(od[0], od[5][ident][1], od[5][ident][2]))
stdout.write('%s,%s,%s' % \
(d[0], d[3], csv_text))
# iterate thru each data and delete the items
for od in [x for x in data if x != d]:
del od[5][ident]
else: # id does not yet exists in all other dicts
d[5][ident]=(hashed, d[3], csv_text)
# file switching every 100,000 rows
if d[3] % 100000 == 0:
current_index+=1
except StopIteration:
# set completion status and increment data index
d[4]=True
current_index+=1
if verbose:
stderr.write('Scanning for unique IDs\n')
# Find Rows that are not in ALL files.
stderr.write('%s,%s' % (id_column, ','.join([d[0] for d in data])))
if len(files) > 2:
stderr.write(',consistency')
stderr.write('\n')
# create a set of all remaining IDs
id_set=set().union(*[d[5].keys() for d in data])
# iterate the set
for id in id_set:
if len(files) > 2:
_last_hash=None
matched=True
stderr.write('%s' % (id,))
for d in data:
if id in d[5]:
stderr.write(',True')
if len(files) > 2:
# check last hash
if _last_hash and d[5][id][0]!= _last_hash:
matched=False
_last_hash=d[5][id][0]
else:
stderr.write(',False')
if len(files) > 2:
if matched:
stderr.write(',consistent')
else:
stderr.write(',mismatched')
stderr.write('\n')
def main(argv=None):
if argv is None:
argv = sys.argv
parser=argparse.ArgumentParser(description='compare two csv files')
parser.add_argument('files', help='file to compare',
metavar='file', nargs='+')
parser.add_argument('identity_col', metavar='identity_column',
help='identity column name')
parser.add_argument('--ignore', help='columns to ignore',
nargs='+', default=[])
parser.add_argument('--verbose', '-v', help='verbose indicator',
action='store_true', default=False)
args=parser.parse_args(argv[1:])
files=args.files
identity_col=[] or args.identity_col
ignore=args.ignore
verbose=args.verbose
compare(files, identity_col, ignore_columns=ignore, verbose=verbose)
if __name__ == "__main__":
sys.exit(main())
import json
import unittest
import xmltodict
class XmlDiff(object):
def __init__(self, xml1, xml2):
self.dict1 = json.loads(json.dumps((xmltodict.parse(xml1))))
self.dict2 = json.loads(json.dumps((xmltodict.parse(xml2))))
@staticmethod
def ignore_in_dict(dict1):
for i in dict1.values():
if i == "%%IGNORE%%":
return True
return False
def remove_ignore_field(self, dict_returned, dict_expected):
for expected_keys, expected_values in dict_expected.items():
if isinstance(expected_values, dict):
if self.ignore_in_dict(expected_values):
del dict_expected[expected_keys]
del dict_returned[expected_keys]
if expected_keys in dict_returned:
self.remove_ignore_field(expected_values, dict_returned[expected_keys])
elif type(expected_values) is list:
list1 = []
list2 = []
for i, e in enumerate(expected_values):
if not self.ignore_in_dict(e):
list1.append(dict_expected[expected_keys][i])
list2.append(dict_returned[expected_keys][i])
dict_expected[expected_keys] = list1
dict_returned[expected_keys] = list2
return dict_expected, dict_returned
def equal(self):
self.remove_ignore_field(self.dict1, self.dict2)
return self.dict1 == self.dict2
class XMLDiffTestCase(unittest.TestCase):
def test_xml_equal(self):
xml1 = """<a></a>"""
xml2 = """<a></a>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_xml_are_not_equal(self):
xml1 = "<a></a>"
xml2 = "<b></b>"
self.assertFalse(XmlDiff(xml1, xml2).equal())
def test_parameter_order_doesnt_matter(self):
xml1 = """<a p1="1" p2="2"></a>"""
xml2 = """<a p2="2" p1="1"></a>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_newline_doesnt_matter(self):
xml1 = """<a></a>"""
xml2 = """<a>
</a>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_nested_tag(self):
xml1 = """<a><b></b></a>"""
xml2 = """
<a>
<b></b>
</a>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_nested_tag_not_equal(self):
xml1 = """<a><b></b><c><d></d></c><e></e></a>"""
xml2 = """
<a>
<b></b>
</a>
"""
self.assertFalse(XmlDiff(xml1, xml2).equal())
def test_nested_tag_parameter_not_equal(self):
xml1 = """<t t="r" v="1"><f><p k="a" v="2"/></f></t>"""
xml2 = """
<t t="r" v="1">
<f>
<p k="a" v="1"/>
</f>
</t>"""
self.assertFalse(XmlDiff(xml1, xml2).equal())
def test_double_tag_equal(self):
xml1 = """<t t="r" v="1"><f><p k="a" v="1"/><p k="b" v="2"/></f></t>"""
xml2 = """
<t t="r" v="1">
<f>
<p k="a" v="1"/>
<p k="b" v="2"/>
</f>
</t>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_xml_equal_with_encoding(self):
xml1 = """<?xml version='1.0' encoding='utf-8' standalone='yes'?>
<Stats start="1275955200" end="1276041599">
</Stats>"""
xml2 = """<?xml version='1.0' encoding='utf-8' standalone='yes'?>
<Stats end="1276041599" start="1275955200" >
</Stats>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_ignore_field(self):
xml1 = """<t t="r" v="1"><f><p k="a" v="1"/><p k="b" v="2"/></f></t>"""
xml2 = """
<t t="r" v="1">
<f>
<p k="a" v="1"/>
<p k="b" v="%%IGNORE%%"/>
</f>
</t>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_ignore_one_field(self):
xml1 = """<t t="r" v="1"><f><p k="b" v="2"/></f></t>"""
xml2 = """
<t t="r" v="1">
<f>
<p k="b" v="%%IGNORE%%"/>
</f>
</t>"""
self.assertTrue(XmlDiff(xml1, xml2).equal())
def test_ignore_three_fields(self):
xml1 = """<?xml version="1.0" ?><trophy type="result" version="1.0">
<status code="0" message="ok" />
<version>
<parameter key="version" value="%%IGNORE%%" />
<parameter key="build_date" value="%%IGNORE%%" />
<parameter key="type" value="%%IGNORE%%" />
</version>
</trophy>"""
xml2 = """<?xml version="1.0" ?>
<trophy type="result" version="1.0">
<status code="0" message="ok"/>
<version>
<parameter key="version" value="4.2.1.0"/>
<parameter key="build_date" value="10/09/2015"/>
<parameter key="type" value="Lite"/>
</version>
</trophy>
"""
self.assertTrue(XmlDiff(xml2, xml1).equal())
def test_ingore_in_dict(self):
dict1 = {u'@k': u'b', u'@v': u'%%IGNORE%%'}
self.assertTrue(XmlDiff.ignore_in_dict(dict1))
dict2 = {u'@k': u'b', u'@v': u'value'}
self.assertFalse(XmlDiff.ignore_in_dict(dict2))
try:
import doctest
doctest.OutputChecker
except AttributeError: # Python < 2.4
import util.doctest24 as doctest
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
from xml.parsers.expat import ExpatError as XMLParseError
RealOutputChecker = doctest.OutputChecker
def debug(*msg):
import sys
print >> sys.stderr, ' '.join(map(str, msg))
class HTMLOutputChecker(RealOutputChecker):
def check_output(self, want, got, optionflags):
normal = RealOutputChecker.check_output(self, want, got, optionflags)
if normal or not got:
return normal
try:
want_xml = make_xml(want)
except XMLParseError:
pass
else:
try:
got_xml = make_xml(got)
except XMLParseError:
pass
else:
if xml_compare(want_xml, got_xml):
return True
return False
def output_difference(self, example, got, optionflags):
actual = RealOutputChecker.output_difference(
self, example, got, optionflags)
want_xml = got_xml = None
try:
want_xml = make_xml(example.want)
want_norm = make_string(want_xml)
except XMLParseError, e:
if example.want.startswith('<'):
want_norm = '(bad XML: %s)' % e
# '<xml>%s</xml>' % example.want
else:
return actual
try:
got_xml = make_xml(got)
got_norm = make_string(got_xml)
except XMLParseError, e:
if example.want.startswith('<'):
got_norm = '(bad XML: %s)' % e
else:
return actual
s = '%s\nXML Wanted: %s\nXML Got : %s\n' % (
actual, want_norm, got_norm)
if got_xml and want_xml:
result = []
xml_compare(want_xml, got_xml, result.append)
s += 'Difference report:\n%s\n' % '\n'.join(result)
return s
def xml_compare(x1, x2, reporter=None):
if x1.tag != x2.tag:
if reporter:
reporter('Tags do not match: %s and %s' % (x1.tag, x2.tag))
return False
for name, value in x1.attrib.items():
if x2.attrib.get(name) != value:
if reporter:
reporter('Attributes do not match: %s=%r, %s=%r'
% (name, value, name, x2.attrib.get(name)))
return False
for name in x2.attrib.keys():
if name not in x1.attrib:
if reporter:
reporter('x2 has an attribute x1 is missing: %s'
% name)
return False
if not text_compare(x1.text, x2.text):
if reporter:
reporter('text: %r != %r' % (x1.text, x2.text))
return False
if not text_compare(x1.tail, x2.tail):
if reporter:
reporter('tail: %r != %r' % (x1.tail, x2.tail))
return False
cl1 = x1.getchildren()
cl2 = x2.getchildren()
if len(cl1) != len(cl2):
if reporter:
reporter('children length differs, %i != %i'
% (len(cl1), len(cl2)))
return False
i = 0
for c1, c2 in zip(cl1, cl2):
i += 1
if not xml_compare(c1, c2, reporter=reporter):
if reporter:
reporter('children %i do not match: %s'
% (i, c1.tag))
return False
return True
def text_compare(t1, t2):
if not t1 and not t2:
return True
if t1 == '*' or t2 == '*':
return True
return (t1 or '').strip() == (t2 or '').strip()
def make_xml(s):
return ET.XML('<xml>%s</xml>' % s)
def make_string(xml):
if isinstance(xml, (str, unicode)):
xml = make_xml(xml)
s = ET.tostring(xml)
if s == '<xml />':
return ''
assert s.startswith('<xml>') and s.endswith('</xml>'), repr(s)
return s[5:-6]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment