Skip to content

Instantly share code, notes, and snippets.

@dnanhkhoa
Last active November 4, 2019 04:51
Show Gist options
  • Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.
Save dnanhkhoa/76a9a7e853eb6c14480b58ad792752e8 to your computer and use it in GitHub Desktop.

Revisions

  1. Khoa Duong revised this gist Nov 4, 2019. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions approximate_code_retrieval.py
    Original file line number Diff line number Diff line change
    @@ -13,6 +13,7 @@
    import numpy as np # Just for evaluation

    random.seed(42)
    np.random.seed(42)

    VALID_CHARS = string.ascii_uppercase + string.digits

  2. Khoa Duong created this gist Nov 4, 2019.
    292 changes: 292 additions & 0 deletions approximate_code_retrieval.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,292 @@
    # -*- coding: utf-8 -*-
    # This is an implementation of the paper modified with edit distance algorithm:
    # Simple and Efficient Algorithm for Approximate Dictionary Matching
    # https://www.aclweb.org/anthology/C10-1096.pdf
    # Edit Distance algorithm
    # https://en.wikipedia.org/wiki/Edit_distance

    import math
    import random
    import string
    from collections import defaultdict

    import numpy as np # Just for evaluation

    random.seed(42)

    VALID_CHARS = string.ascii_uppercase + string.digits


    def generate_random_code(code_len):
    return "".join(random.choices(VALID_CHARS, k=code_len))


    def generate_random_codes(num_codes, code_len_range):
    codes = set() # Using set to make sure there are no duplicates

    while len(codes) < num_codes:
    codes.add(generate_random_code(random.randint(*code_len_range)))

    return codes


    def load_external_codes(filename):
    codes = set() # Using set to make sure there are no duplicates

    with open(filename, "r", encoding="UTF-8") as fin:
    for line in filter(None, map(str.strip, fin)):
    codes.add(line)

    return codes


    class EditDistance:
    def __init__(self, weights):
    self._weights = weights

    def __find_diff_segment(self, s1, s2):
    pos = 0
    s1_len = len(s1)
    s2_len = len(s2)
    min_len = min(s1_len, s2_len)

    while min_len > 0 and s1[s1_len - 1] == s2[s2_len - 1]:
    s1_len -= 1
    s2_len -= 1
    min_len -= 1

    while pos < min_len and s1[pos] == s2[pos]:
    pos += 1

    return pos, s1_len, s2_len

    def score(self, s1, s2):
    pos, s1_diff_len, s2_diff_len = self.__find_diff_segment(s1, s2)

    if min(s1_diff_len, s2_diff_len) == pos:
    return abs(s1_diff_len - s2_diff_len)

    s1 = s1[pos:s1_diff_len]
    s2 = s2[pos:s2_diff_len]

    s1_len = len(s1)
    s2_len = len(s2)

    # This is a variation of the original algorithm to reduce memory usage
    # Space: O(M * N) -> O(2 * min(M, N))

    last_row = None
    current_row = list(range(1, s2_len + 1)) + [0]

    for row_idx in range(s1_len):
    # Swap rows
    last_row = current_row
    current_row = [0] * s2_len + [row_idx + 1]

    for col_idx in range(s2_len):
    del_cost = last_row[col_idx] + 1
    ins_cost = current_row[col_idx - 1] + 1
    sub_cost = last_row[col_idx - 1]

    if s1[row_idx] != s2[col_idx]:
    sub_cost += self._weights.get((s1[row_idx], s2[col_idx]), 1)

    current_row[col_idx] = min(del_cost, ins_cost, sub_cost)

    return current_row[s2_len - 1]


    class CPMerge:
    def __init__(self, scorer, n=3):
    self._scorer = scorer
    self._n = n
    self._forward_map = {}
    self._inverted_map = defaultdict(lambda: defaultdict(set))

    def __generate_ngrams(self, code):
    padding_chars = "$" * (self._n - 1)
    # This way is used to cover the edge cases
    code = padding_chars + code + padding_chars
    return {code[i : i + self._n] for i in range(len(code) - self._n + 1)}

    def index(self, codes):
    # This function is just called once
    for code in codes:
    n_grams = self.__generate_ngrams(code)
    feature_size = len(n_grams)

    self._forward_map[code] = n_grams

    for n_gram in n_grams:
    self._inverted_map[feature_size][n_gram].add(code)

    def __compute_min_feature_size(self, feature_size, alpha):
    return math.ceil(alpha ** 2 * feature_size)

    def __compute_max_feature_size(self, feature_size, alpha):
    return math.floor(feature_size / alpha ** 2)

    def __compute_min_overlap(self, x_feature_size, y_feature_size, alpha):
    return math.ceil(alpha * math.sqrt(x_feature_size * y_feature_size))

    def __compute_cosine_similarity(self, x_features, y_features):
    return len(x_features & y_features) / math.sqrt(
    len(x_features) * len(y_features)
    )

    def lookup(self, code, alpha=0.5, top_k=None):
    n_grams = self.__generate_ngrams(code)
    feature_size = len(n_grams)

    code_candidates = []

    for candidate_feature_size in range(
    self.__compute_min_feature_size(feature_size, alpha),
    self.__compute_max_feature_size(feature_size, alpha) + 1,
    ): # [min_feature_size, ..., max_feature_size]
    min_overlap = self.__compute_min_overlap(
    feature_size, candidate_feature_size, alpha
    )

    sorted_n_grams = sorted(
    n_grams,
    key=lambda n_gram: len(
    self._inverted_map[candidate_feature_size][n_gram]
    ),
    )

    overlap_counts = defaultdict(int)

    for n_gram in sorted_n_grams[: feature_size - min_overlap + 1]:
    for code_candidate in self._inverted_map[candidate_feature_size][
    n_gram
    ]:
    overlap_counts[code_candidate] += 1

    for code_candidate in overlap_counts:
    for i in range(feature_size - min_overlap + 1, feature_size):
    if (
    code_candidate
    in self._inverted_map[candidate_feature_size][sorted_n_grams[i]]
    ): # Use hash lookup instead of the binary search as mentioned in the paper
    overlap_counts[code_candidate] += 1

    if overlap_counts[code_candidate] >= min_overlap:
    if self._scorer: # Use edit distance score
    code_candidates.append(
    (
    code_candidate,
    1
    - self._scorer.score(code, code_candidate)
    / len(code_candidate),
    )
    )
    else: # Use cosine similarity
    code_candidates.append(
    (
    code_candidate,
    self.__compute_cosine_similarity(
    n_grams, self._forward_map[code_candidate]
    ),
    )
    )
    break # Found the similar code
    elif (
    overlap_counts[code_candidate] + feature_size - i - 1
    < min_overlap
    ):
    # Prune candidates if the candidates are found to be unreachable for overlaps
    break

    # Rank candidates by score
    return sorted(code_candidates, key=lambda x: (-x[1], x[0]))[:top_k]


    def generate_error_code(code, ocr_accuracy):
    # This function simulates a practical OCR
    # A model has p % accuracy could be modeled as a series of Bernoulli trials
    chars = []

    for char in code:
    if np.random.geometric(p=ocr_accuracy) == 1: # Correct prediction => no changes
    chars.append(char)
    else: # Incorrect prediction
    prob = random.random()
    if prob < 0.5: # 50% causing mistaken characters
    chars.extend(random.choices(VALID_CHARS))
    elif prob < 0.75: # 25% adding of noise
    chars.extend(random.choices(VALID_CHARS))
    chars.append(char)
    else: # 25% causing missed characters
    pass

    return "".join(chars)


    def evaluate(predictions, golds, top_k=1):
    # This function assess the performance of the code searching module based on Recall metric
    assert len(predictions) == len(golds)

    num_corrects = 0
    num_golds = 0

    for candidates, gold in zip(predictions, golds):
    candidates = {code for code, _ in candidates[:top_k]}

    if gold in candidates:
    num_corrects += 1

    num_golds += 1

    return num_corrects / num_golds


    if __name__ == "__main__":
    # Configs
    n_gram = 2
    alpha = 0.5 # Need to be tuned for CPMerge, not Edit Distance
    top_k = 10

    weights = {} # Define a sparse cost matrix for confusable characters

    # Generate synthetic codes
    # codes = generate_random_codes(num_codes=50_000, code_len_range=(3, 20))

    # Or load codes from file
    codes = load_external_codes("codes.txt")

    # Create a ranker
    ranker = EditDistance(weights)

    # Or not use ranker (cosine similarity instead)
    # ranker = None

    # Create a matcher
    cp_merge = CPMerge(ranker, n_gram)

    # Index codes, do this once
    cp_merge.index(codes)

    # Lookup here
    # print(cp_merge.lookup("RHS7WG166X4-SR", alpha, top_k)) # RHS71WG16RX4-SR
    # print(cp_merge.lookup("RH571WG16RX4-5R", alpha, top_k)) # RHS71WG16RX4-SR
    # print(cp_merge.lookup("OTQ-C47055SAY", alpha, top_k)) # OTQ-C4705SAY
    # print(cp_merge.lookup("TQ-C470SSAY", alpha, top_k)) # OTQ-C4705SAY

    # Evaluation
    ocr_accuracy = 0.9 # Just my assumption

    predictions = []

    for code in codes:
    # Simulate the output of OCR model
    predicted_code = generate_error_code(code, ocr_accuracy)

    # Lookup here
    predictions.append(cp_merge.lookup(predicted_code, alpha, top_k))

    # Recall@1
    print("Recall@1:", evaluate(predictions, codes, 1))

    # Recall@k
    print("Recall@" + str(top_k), evaluate(predictions, codes, top_k))