Last active
November 4, 2019 04:51
-
-
Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.
N-gram + Edit Distance
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| # This is an implementation of the paper modified with edit distance algorithm: | |
| # Simple and Efficient Algorithm for Approximate Dictionary Matching | |
| # https://www.aclweb.org/anthology/C10-1096.pdf | |
| # Edit Distance algorithm | |
| # https://en.wikipedia.org/wiki/Edit_distance | |
| import math | |
| import random | |
| import string | |
| from collections import defaultdict | |
| import numpy as np # Just for evaluation | |
| random.seed(42) | |
| np.random.seed(42) | |
| VALID_CHARS = string.ascii_uppercase + string.digits | |
| def generate_random_code(code_len): | |
| return "".join(random.choices(VALID_CHARS, k=code_len)) | |
| def generate_random_codes(num_codes, code_len_range): | |
| codes = set() # Using set to make sure there are no duplicates | |
| while len(codes) < num_codes: | |
| codes.add(generate_random_code(random.randint(*code_len_range))) | |
| return codes | |
| def load_external_codes(filename): | |
| codes = set() # Using set to make sure there are no duplicates | |
| with open(filename, "r", encoding="UTF-8") as fin: | |
| for line in filter(None, map(str.strip, fin)): | |
| codes.add(line) | |
| return codes | |
| class EditDistance: | |
| def __init__(self, weights): | |
| self._weights = weights | |
| def __find_diff_segment(self, s1, s2): | |
| pos = 0 | |
| s1_len = len(s1) | |
| s2_len = len(s2) | |
| min_len = min(s1_len, s2_len) | |
| while min_len > 0 and s1[s1_len - 1] == s2[s2_len - 1]: | |
| s1_len -= 1 | |
| s2_len -= 1 | |
| min_len -= 1 | |
| while pos < min_len and s1[pos] == s2[pos]: | |
| pos += 1 | |
| return pos, s1_len, s2_len | |
| def score(self, s1, s2): | |
| pos, s1_diff_len, s2_diff_len = self.__find_diff_segment(s1, s2) | |
| if min(s1_diff_len, s2_diff_len) == pos: | |
| return abs(s1_diff_len - s2_diff_len) | |
| s1 = s1[pos:s1_diff_len] | |
| s2 = s2[pos:s2_diff_len] | |
| s1_len = len(s1) | |
| s2_len = len(s2) | |
| # This is a variation of the original algorithm to reduce memory usage | |
| # Space: O(M * N) -> O(2 * min(M, N)) | |
| last_row = None | |
| current_row = list(range(1, s2_len + 1)) + [0] | |
| for row_idx in range(s1_len): | |
| # Swap rows | |
| last_row = current_row | |
| current_row = [0] * s2_len + [row_idx + 1] | |
| for col_idx in range(s2_len): | |
| del_cost = last_row[col_idx] + 1 | |
| ins_cost = current_row[col_idx - 1] + 1 | |
| sub_cost = last_row[col_idx - 1] | |
| if s1[row_idx] != s2[col_idx]: | |
| sub_cost += self._weights.get((s1[row_idx], s2[col_idx]), 1) | |
| current_row[col_idx] = min(del_cost, ins_cost, sub_cost) | |
| return current_row[s2_len - 1] | |
| class CPMerge: | |
| def __init__(self, scorer, n=3): | |
| self._scorer = scorer | |
| self._n = n | |
| self._forward_map = {} | |
| self._inverted_map = defaultdict(lambda: defaultdict(set)) | |
| def __generate_ngrams(self, code): | |
| padding_chars = "$" * (self._n - 1) | |
| # This way is used to cover the edge cases | |
| code = padding_chars + code + padding_chars | |
| return {code[i : i + self._n] for i in range(len(code) - self._n + 1)} | |
| def index(self, codes): | |
| # This function is just called once | |
| for code in codes: | |
| n_grams = self.__generate_ngrams(code) | |
| feature_size = len(n_grams) | |
| self._forward_map[code] = n_grams | |
| for n_gram in n_grams: | |
| self._inverted_map[feature_size][n_gram].add(code) | |
| def __compute_min_feature_size(self, feature_size, alpha): | |
| return math.ceil(alpha ** 2 * feature_size) | |
| def __compute_max_feature_size(self, feature_size, alpha): | |
| return math.floor(feature_size / alpha ** 2) | |
| def __compute_min_overlap(self, x_feature_size, y_feature_size, alpha): | |
| return math.ceil(alpha * math.sqrt(x_feature_size * y_feature_size)) | |
| def __compute_cosine_similarity(self, x_features, y_features): | |
| return len(x_features & y_features) / math.sqrt( | |
| len(x_features) * len(y_features) | |
| ) | |
| def lookup(self, code, alpha=0.5, top_k=None): | |
| n_grams = self.__generate_ngrams(code) | |
| feature_size = len(n_grams) | |
| code_candidates = [] | |
| for candidate_feature_size in range( | |
| self.__compute_min_feature_size(feature_size, alpha), | |
| self.__compute_max_feature_size(feature_size, alpha) + 1, | |
| ): # [min_feature_size, ..., max_feature_size] | |
| min_overlap = self.__compute_min_overlap( | |
| feature_size, candidate_feature_size, alpha | |
| ) | |
| sorted_n_grams = sorted( | |
| n_grams, | |
| key=lambda n_gram: len( | |
| self._inverted_map[candidate_feature_size][n_gram] | |
| ), | |
| ) | |
| overlap_counts = defaultdict(int) | |
| for n_gram in sorted_n_grams[: feature_size - min_overlap + 1]: | |
| for code_candidate in self._inverted_map[candidate_feature_size][ | |
| n_gram | |
| ]: | |
| overlap_counts[code_candidate] += 1 | |
| for code_candidate in overlap_counts: | |
| for i in range(feature_size - min_overlap + 1, feature_size): | |
| if ( | |
| code_candidate | |
| in self._inverted_map[candidate_feature_size][sorted_n_grams[i]] | |
| ): # Use hash lookup instead of the binary search as mentioned in the paper | |
| overlap_counts[code_candidate] += 1 | |
| if overlap_counts[code_candidate] >= min_overlap: | |
| if self._scorer: # Use edit distance score | |
| code_candidates.append( | |
| ( | |
| code_candidate, | |
| 1 | |
| - self._scorer.score(code, code_candidate) | |
| / len(code_candidate), | |
| ) | |
| ) | |
| else: # Use cosine similarity | |
| code_candidates.append( | |
| ( | |
| code_candidate, | |
| self.__compute_cosine_similarity( | |
| n_grams, self._forward_map[code_candidate] | |
| ), | |
| ) | |
| ) | |
| break # Found the similar code | |
| elif ( | |
| overlap_counts[code_candidate] + feature_size - i - 1 | |
| < min_overlap | |
| ): | |
| # Prune candidates if the candidates are found to be unreachable for overlaps | |
| break | |
| # Rank candidates by score | |
| return sorted(code_candidates, key=lambda x: (-x[1], x[0]))[:top_k] | |
| def generate_error_code(code, ocr_accuracy): | |
| # This function simulates a practical OCR | |
| # A model has p % accuracy could be modeled as a series of Bernoulli trials | |
| chars = [] | |
| for char in code: | |
| if np.random.geometric(p=ocr_accuracy) == 1: # Correct prediction => no changes | |
| chars.append(char) | |
| else: # Incorrect prediction | |
| prob = random.random() | |
| if prob < 0.5: # 50% causing mistaken characters | |
| chars.extend(random.choices(VALID_CHARS)) | |
| elif prob < 0.75: # 25% adding of noise | |
| chars.extend(random.choices(VALID_CHARS)) | |
| chars.append(char) | |
| else: # 25% causing missed characters | |
| pass | |
| return "".join(chars) | |
| def evaluate(predictions, golds, top_k=1): | |
| # This function assess the performance of the code searching module based on Recall metric | |
| assert len(predictions) == len(golds) | |
| num_corrects = 0 | |
| num_golds = 0 | |
| for candidates, gold in zip(predictions, golds): | |
| candidates = {code for code, _ in candidates[:top_k]} | |
| if gold in candidates: | |
| num_corrects += 1 | |
| num_golds += 1 | |
| return num_corrects / num_golds | |
| if __name__ == "__main__": | |
| # Configs | |
| n_gram = 2 | |
| alpha = 0.5 # Need to be tuned for CPMerge, not Edit Distance | |
| top_k = 10 | |
| weights = {} # Define a sparse cost matrix for confusable characters | |
| # Generate synthetic codes | |
| # codes = generate_random_codes(num_codes=50_000, code_len_range=(3, 20)) | |
| # Or load codes from file | |
| codes = load_external_codes("codes.txt") | |
| # Create a ranker | |
| ranker = EditDistance(weights) | |
| # Or not use ranker (cosine similarity instead) | |
| # ranker = None | |
| # Create a matcher | |
| cp_merge = CPMerge(ranker, n_gram) | |
| # Index codes, do this once | |
| cp_merge.index(codes) | |
| # Lookup here | |
| # print(cp_merge.lookup("RHS7WG166X4-SR", alpha, top_k)) # RHS71WG16RX4-SR | |
| # print(cp_merge.lookup("RH571WG16RX4-5R", alpha, top_k)) # RHS71WG16RX4-SR | |
| # print(cp_merge.lookup("OTQ-C47055SAY", alpha, top_k)) # OTQ-C4705SAY | |
| # print(cp_merge.lookup("TQ-C470SSAY", alpha, top_k)) # OTQ-C4705SAY | |
| # Evaluation | |
| ocr_accuracy = 0.9 # Just my assumption | |
| predictions = [] | |
| for code in codes: | |
| # Simulate the output of OCR model | |
| predicted_code = generate_error_code(code, ocr_accuracy) | |
| # Lookup here | |
| predictions.append(cp_merge.lookup(predicted_code, alpha, top_k)) | |
| # Recall@1 | |
| print("Recall@1:", evaluate(predictions, codes, 1)) | |
| # Recall@k | |
| print("Recall@" + str(top_k), evaluate(predictions, codes, top_k)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment