Skip to content

Instantly share code, notes, and snippets.

@atakemura
Last active August 29, 2015 14:11
Show Gist options
  • Select an option

  • Save atakemura/adc2f2539fd02876c63e to your computer and use it in GitHub Desktop.

Select an option

Save atakemura/adc2f2539fd02876c63e to your computer and use it in GitHub Desktop.
PythonでCount-Min Sketch実装してみた

最近読んだHokusai論文の中にあったCount-Min Sketchというアルゴリズムを実装してみた。

##そもそもCount-Min Sketchとは?

詳しい説明はG. Cormode, S. Muthukrishnanによる元の論文解説をご一読いただければと。

学者や研究者の方が興味を持つところとは若干違うかもしれないが、

  • 全データをメモリに載せるのは諸事情により不可能(お金の問題でRAM100GB+はきつい)
  • 毎回SQLでSELECT KEY, COUNT(*)...は遅くて我慢できません
  • 毎回MapReduceでWordCountも遅くて我慢できません
  • 精度はある程度あればいいです(本当に正確な値が欲しければ遅いのは我慢して全部読む)

という状態で、「ある事象が起こった回数」をさくっと出したいというニーズに答えるアルゴリズムかなと。例えば、ある商品Xが購入された回数とか、あるURLがクリックされた回数とか、ある単語Zが検索された回数とか。

簡単に説明すると(できるかあやしいが)、

  1. 深さd、幅wの行列を用意する(0で初期化)
  2. dの数だけ対独立ハッシュ関数を準備する
  3. データストリーム(入力)でkeyが出るたびそれに対応した値を更新する
    このとき、全てのd[key]に対して更新をかける(count)
  4. 値を取り出したいときは、全てのd[key]の中から最小値を取り出して返す(min)

なぜ最小値なのか?
(たぶん)それはここで使うハッシュ関数は衝突を起こすことがあるからで、衝突が起こるとそのkeyは何重にも更新がかかってしまい本来より高い値が出るから。

# coding: utf-8

import sys
import hashlib
import numpy as np

# 9番目のメルセンヌ素数
BIG_PRIME = (2 ** 61) - 1

def random_parameter():
    """
    "a, b are values chosen randomly...range 1 to p-1"
    numpy.random.randintはlow(含む), high(含まない)の間から一個取る(左閉半開)
    """
    return np.random.randint(1, BIG_PRIME)

def hash_to_int(thing):
    """
    thingがハッシュ化できる場合は、ハッシュ化して整数を返す
    ハッシュ化できなかったら…?
    """
    to_int = abs(int((hashlib.md5(thing)).hexdigest(),
                     16))
    return to_int

class CountMinSketch(object):
    def __init__(self, delta, epsilon):
        """
        delta: 誤差が起こる確率; ~0.000001とか
        epsilon: 誤差の係数; ~0.001とか
        """
        self.depth = int(np.ceil(np.log(1 / delta)))
        self.width = int(np.ceil(np.exp(1) / epsilon))
        self.list_hash_functions =\
        [self.generate_hash_function() for i in xrange(self.depth)]
        self.count = np.zeros((self.depth, self.width), dtype="int64")
        
    def generate_hash_function(self):
        a, b = random_parameter(), random_parameter()
        return lambda i: (a * i + b) % BIG_PRIME % self.width
    
    def update(self, key, count):
        """
        key: 更新する値のキー
        count: 更新する値に足す値
        """
        for row, hash_function in enumerate(self.list_hash_functions):
            col = hash_function(hash_to_int(key))
            self.count[row, col] += count

    def get(self, key):
        value = sys.maxint
        for row, hash_function in enumerate(self.list_hash_functions):
            col = hash_function(hash_to_int(key))
            value = min(self.count[row, col], value)
        return value

実行コード

cms = CountMinSketch(0.001, 0.000001)
stream = []
for i in xrange(1000):
    stream = stream + [str(i)] * i
np.random.shuffle(stream)

for s in stream:
    cms.update(s, 1)
cms.get('999') #=> 999

用途

SELECT KEY, COUNT(*)
FROM HOGE
WHERE KEY = "SOME CONDITION"
GROUP BY KEY

みたいなことしてるとあほみたいに時間かかって(#^ω^)ピキピキとなる問題には使えるかもしれません。元の論文だと、90GBのコーパスを8GBまで縮約することができたという報告もあります。正確さに関しては速度・メモリ消費とご相談といった形になるので(それでも100%にはならない)、100%正確でないといけないといった用途には向きません。あとは小さな値を全て推計するのも苦手です。頻度ランキングで見た時の上位が1,000,000とかで、下の方が1とかだと、epsilonを現実的な値にするとdeltaの確率で200-500という誤差が発生しますので下の方を細かく見ようというのも苦手です。まぁ問題によっては個人的には10,000超えたあたりから200とか誤差だからいいだろと言いたくなりますが。

あと今のままでは時系列の推計にも向きません。数年分の分刻みの解像度を持つ行列をメモリに保持するとなると、時間軸だけで年間約526,000あります。これにepsilondeltaをベースにし算出した行列の行数や列数を掛け算すると恐ろしい次元数になります…

そこで、過去に遡れば遡るほどどんどん次元を圧縮していく方法をとって、現実的に計算できるようにしたのがHokusai論文なのですが、それはまたこんど機会があれば。(圧縮という表現が正しいのかは微妙...)

ほか

数値の代わりにTrue/Falseのみが返ってくる親戚(先祖)?のBloom FilterはApache Cassandra等で使われているもよう。確かに、そういうクラスは探すと見つかるMurmurを使った実装もされている。Cassandraの場合用途としては、自ノードが与えられたキーを保持しているかどうかチェックするのにいちいちdiskI/O起こしていたら遅くなるから、メモリに保持しておいたBloom FilterでフィルタすることでlookupのためにムダなdiskI/Oを発生させないため、らしい。Bloom FilterはFalse Positiveは出るがFalse Negativeは出ないという特性があるので、運が悪いとdiskI/Oが発生するがそれがないのと比べると相当速いということか。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment