Skip to content

Instantly share code, notes, and snippets.

@dnanhkhoa
Last active November 4, 2019 04:51
Show Gist options
  • Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.
Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.
N-gram + Edit Distance
# -*- coding: utf-8 -*-
# This is an implementation of the paper modified with edit distance algorithm:
# Simple and Efficient Algorithm for Approximate Dictionary Matching
# https://www.aclweb.org/anthology/C10-1096.pdf
# Edit Distance algorithm
# https://en.wikipedia.org/wiki/Edit_distance
import math
import random
import string
from collections import defaultdict
import numpy as np # Just for evaluation
random.seed(42)
np.random.seed(42)
VALID_CHARS = string.ascii_uppercase + string.digits
def generate_random_code(code_len):
return "".join(random.choices(VALID_CHARS, k=code_len))
def generate_random_codes(num_codes, code_len_range):
codes = set() # Using set to make sure there are no duplicates
while len(codes) < num_codes:
codes.add(generate_random_code(random.randint(*code_len_range)))
return codes
def load_external_codes(filename):
codes = set() # Using set to make sure there are no duplicates
with open(filename, "r", encoding="UTF-8") as fin:
for line in filter(None, map(str.strip, fin)):
codes.add(line)
return codes
class EditDistance:
def __init__(self, weights):
self._weights = weights
def __find_diff_segment(self, s1, s2):
pos = 0
s1_len = len(s1)
s2_len = len(s2)
min_len = min(s1_len, s2_len)
while min_len > 0 and s1[s1_len - 1] == s2[s2_len - 1]:
s1_len -= 1
s2_len -= 1
min_len -= 1
while pos < min_len and s1[pos] == s2[pos]:
pos += 1
return pos, s1_len, s2_len
def score(self, s1, s2):
pos, s1_diff_len, s2_diff_len = self.__find_diff_segment(s1, s2)
if min(s1_diff_len, s2_diff_len) == pos:
return abs(s1_diff_len - s2_diff_len)
s1 = s1[pos:s1_diff_len]
s2 = s2[pos:s2_diff_len]
s1_len = len(s1)
s2_len = len(s2)
# This is a variation of the original algorithm to reduce memory usage
# Space: O(M * N) -> O(2 * min(M, N))
last_row = None
current_row = list(range(1, s2_len + 1)) + [0]
for row_idx in range(s1_len):
# Swap rows
last_row = current_row
current_row = [0] * s2_len + [row_idx + 1]
for col_idx in range(s2_len):
del_cost = last_row[col_idx] + 1
ins_cost = current_row[col_idx - 1] + 1
sub_cost = last_row[col_idx - 1]
if s1[row_idx] != s2[col_idx]:
sub_cost += self._weights.get((s1[row_idx], s2[col_idx]), 1)
current_row[col_idx] = min(del_cost, ins_cost, sub_cost)
return current_row[s2_len - 1]
class CPMerge:
def __init__(self, scorer, n=3):
self._scorer = scorer
self._n = n
self._forward_map = {}
self._inverted_map = defaultdict(lambda: defaultdict(set))
def __generate_ngrams(self, code):
padding_chars = "$" * (self._n - 1)
# This way is used to cover the edge cases
code = padding_chars + code + padding_chars
return {code[i : i + self._n] for i in range(len(code) - self._n + 1)}
def index(self, codes):
# This function is just called once
for code in codes:
n_grams = self.__generate_ngrams(code)
feature_size = len(n_grams)
self._forward_map[code] = n_grams
for n_gram in n_grams:
self._inverted_map[feature_size][n_gram].add(code)
def __compute_min_feature_size(self, feature_size, alpha):
return math.ceil(alpha ** 2 * feature_size)
def __compute_max_feature_size(self, feature_size, alpha):
return math.floor(feature_size / alpha ** 2)
def __compute_min_overlap(self, x_feature_size, y_feature_size, alpha):
return math.ceil(alpha * math.sqrt(x_feature_size * y_feature_size))
def __compute_cosine_similarity(self, x_features, y_features):
return len(x_features & y_features) / math.sqrt(
len(x_features) * len(y_features)
)
def lookup(self, code, alpha=0.5, top_k=None):
n_grams = self.__generate_ngrams(code)
feature_size = len(n_grams)
code_candidates = []
for candidate_feature_size in range(
self.__compute_min_feature_size(feature_size, alpha),
self.__compute_max_feature_size(feature_size, alpha) + 1,
): # [min_feature_size, ..., max_feature_size]
min_overlap = self.__compute_min_overlap(
feature_size, candidate_feature_size, alpha
)
sorted_n_grams = sorted(
n_grams,
key=lambda n_gram: len(
self._inverted_map[candidate_feature_size][n_gram]
),
)
overlap_counts = defaultdict(int)
for n_gram in sorted_n_grams[: feature_size - min_overlap + 1]:
for code_candidate in self._inverted_map[candidate_feature_size][
n_gram
]:
overlap_counts[code_candidate] += 1
for code_candidate in overlap_counts:
for i in range(feature_size - min_overlap + 1, feature_size):
if (
code_candidate
in self._inverted_map[candidate_feature_size][sorted_n_grams[i]]
): # Use hash lookup instead of the binary search as mentioned in the paper
overlap_counts[code_candidate] += 1
if overlap_counts[code_candidate] >= min_overlap:
if self._scorer: # Use edit distance score
code_candidates.append(
(
code_candidate,
1
- self._scorer.score(code, code_candidate)
/ len(code_candidate),
)
)
else: # Use cosine similarity
code_candidates.append(
(
code_candidate,
self.__compute_cosine_similarity(
n_grams, self._forward_map[code_candidate]
),
)
)
break # Found the similar code
elif (
overlap_counts[code_candidate] + feature_size - i - 1
< min_overlap
):
# Prune candidates if the candidates are found to be unreachable for overlaps
break
# Rank candidates by score
return sorted(code_candidates, key=lambda x: (-x[1], x[0]))[:top_k]
def generate_error_code(code, ocr_accuracy):
# This function simulates a practical OCR
# A model has p % accuracy could be modeled as a series of Bernoulli trials
chars = []
for char in code:
if np.random.geometric(p=ocr_accuracy) == 1: # Correct prediction => no changes
chars.append(char)
else: # Incorrect prediction
prob = random.random()
if prob < 0.5: # 50% causing mistaken characters
chars.extend(random.choices(VALID_CHARS))
elif prob < 0.75: # 25% adding of noise
chars.extend(random.choices(VALID_CHARS))
chars.append(char)
else: # 25% causing missed characters
pass
return "".join(chars)
def evaluate(predictions, golds, top_k=1):
# This function assess the performance of the code searching module based on Recall metric
assert len(predictions) == len(golds)
num_corrects = 0
num_golds = 0
for candidates, gold in zip(predictions, golds):
candidates = {code for code, _ in candidates[:top_k]}
if gold in candidates:
num_corrects += 1
num_golds += 1
return num_corrects / num_golds
if __name__ == "__main__":
# Configs
n_gram = 2
alpha = 0.5 # Need to be tuned for CPMerge, not Edit Distance
top_k = 10
weights = {} # Define a sparse cost matrix for confusable characters
# Generate synthetic codes
# codes = generate_random_codes(num_codes=50_000, code_len_range=(3, 20))
# Or load codes from file
codes = load_external_codes("codes.txt")
# Create a ranker
ranker = EditDistance(weights)
# Or not use ranker (cosine similarity instead)
# ranker = None
# Create a matcher
cp_merge = CPMerge(ranker, n_gram)
# Index codes, do this once
cp_merge.index(codes)
# Lookup here
# print(cp_merge.lookup("RHS7WG166X4-SR", alpha, top_k)) # RHS71WG16RX4-SR
# print(cp_merge.lookup("RH571WG16RX4-5R", alpha, top_k)) # RHS71WG16RX4-SR
# print(cp_merge.lookup("OTQ-C47055SAY", alpha, top_k)) # OTQ-C4705SAY
# print(cp_merge.lookup("TQ-C470SSAY", alpha, top_k)) # OTQ-C4705SAY
# Evaluation
ocr_accuracy = 0.9 # Just my assumption
predictions = []
for code in codes:
# Simulate the output of OCR model
predicted_code = generate_error_code(code, ocr_accuracy)
# Lookup here
predictions.append(cp_merge.lookup(predicted_code, alpha, top_k))
# Recall@1
print("Recall@1:", evaluate(predictions, codes, 1))
# Recall@k
print("Recall@" + str(top_k), evaluate(predictions, codes, top_k))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment