import math import mmh3 from bitarray import bitarray class BloomFilter: ''' Class for Bloom filter, using murmur3 hash function ''' def __init__(self, count, fp_prob): ''' count: int Number of items to be stored in Bloom Filter fp_prob: float False positive probability (in decimal) ''' self.fp_prob = fp_prob self.size = BloomFilter.get_size(count, fp_prob) self.hash_count = BloomFilter.get_hash_count(self.size, count) self.bit_array = bitarray(self.size) self.bit_array.setall(0) def add(self, item): ''' Add an item to the filter ''' # create a generator having a digest for every hash function digests = ( mmh3.hash(item, i) % self.size for i in range(self.hash_count) ) for i in digests: self.bit_array[i] = True def check(self, item) -> bool: ''' Check for existence of an item in filter ''' # create a generator having a digest for every hash function digests = ( mmh3.hash(item, i) % self.size for i in range(self.hash_count) ) for i in digests: # if at least one is false, return false if self.bit_array[i] == False: return False return True @staticmethod def get_size(n, p) -> int: ''' Return the size of bit array(m) to be used n : int number of items expected to be stored in filter p : float False Positive probability in decimal ''' m = -(n * math.log(p))/(math.log(2)**2) return int(m) @staticmethod def get_hash_count(m, n) -> int: ''' Return the hash function(k) to be used m : int size of bit array n : int number of items expected to be stored in filter ''' k = (m/n) * math.log(2) return int(k)