Last active
November 4, 2019 04:51
-
-
Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.
Revisions
-
Khoa Duong revised this gist
Nov 4, 2019 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -13,6 +13,7 @@ import numpy as np # Just for evaluation random.seed(42) np.random.seed(42) VALID_CHARS = string.ascii_uppercase + string.digits -
Khoa Duong created this gist
Nov 4, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,292 @@ # -*- coding: utf-8 -*- # This is an implementation of the paper modified with edit distance algorithm: # Simple and Efficient Algorithm for Approximate Dictionary Matching # https://www.aclweb.org/anthology/C10-1096.pdf # Edit Distance algorithm # https://en.wikipedia.org/wiki/Edit_distance import math import random import string from collections import defaultdict import numpy as np # Just for evaluation random.seed(42) VALID_CHARS = string.ascii_uppercase + string.digits def generate_random_code(code_len): return "".join(random.choices(VALID_CHARS, k=code_len)) def generate_random_codes(num_codes, code_len_range): codes = set() # Using set to make sure there are no duplicates while len(codes) < num_codes: codes.add(generate_random_code(random.randint(*code_len_range))) return codes def load_external_codes(filename): codes = set() # Using set to make sure there are no duplicates with open(filename, "r", encoding="UTF-8") as fin: for line in filter(None, map(str.strip, fin)): codes.add(line) return codes class EditDistance: def __init__(self, weights): self._weights = weights def __find_diff_segment(self, s1, s2): pos = 0 s1_len = len(s1) s2_len = len(s2) min_len = min(s1_len, s2_len) while min_len > 0 and s1[s1_len - 1] == s2[s2_len - 1]: s1_len -= 1 s2_len -= 1 min_len -= 1 while pos < min_len and s1[pos] == s2[pos]: pos += 1 return pos, s1_len, s2_len def score(self, s1, s2): pos, s1_diff_len, s2_diff_len = self.__find_diff_segment(s1, s2) if min(s1_diff_len, s2_diff_len) == pos: return abs(s1_diff_len - s2_diff_len) s1 = s1[pos:s1_diff_len] s2 = s2[pos:s2_diff_len] s1_len = len(s1) s2_len = len(s2) # This is a variation of the original algorithm to reduce memory usage # Space: O(M * N) -> O(2 * min(M, N)) last_row = None current_row = list(range(1, s2_len + 1)) + [0] for row_idx in range(s1_len): # Swap rows last_row = current_row current_row = [0] * s2_len + [row_idx + 1] for col_idx in range(s2_len): del_cost = last_row[col_idx] + 1 ins_cost = current_row[col_idx - 1] + 1 sub_cost = last_row[col_idx - 1] if s1[row_idx] != s2[col_idx]: sub_cost += self._weights.get((s1[row_idx], s2[col_idx]), 1) current_row[col_idx] = min(del_cost, ins_cost, sub_cost) return current_row[s2_len - 1] class CPMerge: def __init__(self, scorer, n=3): self._scorer = scorer self._n = n self._forward_map = {} self._inverted_map = defaultdict(lambda: defaultdict(set)) def __generate_ngrams(self, code): padding_chars = "$" * (self._n - 1) # This way is used to cover the edge cases code = padding_chars + code + padding_chars return {code[i : i + self._n] for i in range(len(code) - self._n + 1)} def index(self, codes): # This function is just called once for code in codes: n_grams = self.__generate_ngrams(code) feature_size = len(n_grams) self._forward_map[code] = n_grams for n_gram in n_grams: self._inverted_map[feature_size][n_gram].add(code) def __compute_min_feature_size(self, feature_size, alpha): return math.ceil(alpha ** 2 * feature_size) def __compute_max_feature_size(self, feature_size, alpha): return math.floor(feature_size / alpha ** 2) def __compute_min_overlap(self, x_feature_size, y_feature_size, alpha): return math.ceil(alpha * math.sqrt(x_feature_size * y_feature_size)) def __compute_cosine_similarity(self, x_features, y_features): return len(x_features & y_features) / math.sqrt( len(x_features) * len(y_features) ) def lookup(self, code, alpha=0.5, top_k=None): n_grams = self.__generate_ngrams(code) feature_size = len(n_grams) code_candidates = [] for candidate_feature_size in range( self.__compute_min_feature_size(feature_size, alpha), self.__compute_max_feature_size(feature_size, alpha) + 1, ): # [min_feature_size, ..., max_feature_size] min_overlap = self.__compute_min_overlap( feature_size, candidate_feature_size, alpha ) sorted_n_grams = sorted( n_grams, key=lambda n_gram: len( self._inverted_map[candidate_feature_size][n_gram] ), ) overlap_counts = defaultdict(int) for n_gram in sorted_n_grams[: feature_size - min_overlap + 1]: for code_candidate in self._inverted_map[candidate_feature_size][ n_gram ]: overlap_counts[code_candidate] += 1 for code_candidate in overlap_counts: for i in range(feature_size - min_overlap + 1, feature_size): if ( code_candidate in self._inverted_map[candidate_feature_size][sorted_n_grams[i]] ): # Use hash lookup instead of the binary search as mentioned in the paper overlap_counts[code_candidate] += 1 if overlap_counts[code_candidate] >= min_overlap: if self._scorer: # Use edit distance score code_candidates.append( ( code_candidate, 1 - self._scorer.score(code, code_candidate) / len(code_candidate), ) ) else: # Use cosine similarity code_candidates.append( ( code_candidate, self.__compute_cosine_similarity( n_grams, self._forward_map[code_candidate] ), ) ) break # Found the similar code elif ( overlap_counts[code_candidate] + feature_size - i - 1 < min_overlap ): # Prune candidates if the candidates are found to be unreachable for overlaps break # Rank candidates by score return sorted(code_candidates, key=lambda x: (-x[1], x[0]))[:top_k] def generate_error_code(code, ocr_accuracy): # This function simulates a practical OCR # A model has p % accuracy could be modeled as a series of Bernoulli trials chars = [] for char in code: if np.random.geometric(p=ocr_accuracy) == 1: # Correct prediction => no changes chars.append(char) else: # Incorrect prediction prob = random.random() if prob < 0.5: # 50% causing mistaken characters chars.extend(random.choices(VALID_CHARS)) elif prob < 0.75: # 25% adding of noise chars.extend(random.choices(VALID_CHARS)) chars.append(char) else: # 25% causing missed characters pass return "".join(chars) def evaluate(predictions, golds, top_k=1): # This function assess the performance of the code searching module based on Recall metric assert len(predictions) == len(golds) num_corrects = 0 num_golds = 0 for candidates, gold in zip(predictions, golds): candidates = {code for code, _ in candidates[:top_k]} if gold in candidates: num_corrects += 1 num_golds += 1 return num_corrects / num_golds if __name__ == "__main__": # Configs n_gram = 2 alpha = 0.5 # Need to be tuned for CPMerge, not Edit Distance top_k = 10 weights = {} # Define a sparse cost matrix for confusable characters # Generate synthetic codes # codes = generate_random_codes(num_codes=50_000, code_len_range=(3, 20)) # Or load codes from file codes = load_external_codes("codes.txt") # Create a ranker ranker = EditDistance(weights) # Or not use ranker (cosine similarity instead) # ranker = None # Create a matcher cp_merge = CPMerge(ranker, n_gram) # Index codes, do this once cp_merge.index(codes) # Lookup here # print(cp_merge.lookup("RHS7WG166X4-SR", alpha, top_k)) # RHS71WG16RX4-SR # print(cp_merge.lookup("RH571WG16RX4-5R", alpha, top_k)) # RHS71WG16RX4-SR # print(cp_merge.lookup("OTQ-C47055SAY", alpha, top_k)) # OTQ-C4705SAY # print(cp_merge.lookup("TQ-C470SSAY", alpha, top_k)) # OTQ-C4705SAY # Evaluation ocr_accuracy = 0.9 # Just my assumption predictions = [] for code in codes: # Simulate the output of OCR model predicted_code = generate_error_code(code, ocr_accuracy) # Lookup here predictions.append(cp_merge.lookup(predicted_code, alpha, top_k)) # Recall@1 print("Recall@1:", evaluate(predictions, codes, 1)) # Recall@k print("Recall@" + str(top_k), evaluate(predictions, codes, top_k))