Count-Min Sketch 與頻率估計

問題

在串流場景中(即時處理數百萬筆資料),我們常需要知道每個元素的出現頻率。用 HashMap 儲存所有頻率,對海量串流來說是災難性的。

Count-Min Sketch 解法

Count-Min Sketch (CMS) 用一個二維陣列 + 多個 hash 函數來估計頻率。

import math
import mmh3

class CountMinSketch:
    def __init__(self, epsilon=0.01, delta=0.01):
        """
        epsilon: 誤差容許度
        delta: 失敗機率
        """
        self.width = int(math.ceil(math.e / epsilon))
        self.depth = int(math.ceil(math.log(1 / delta)))
        self.table = [[0] * self.width for _ in range(self.depth)]
    
    def add(self, item, count=1):
        """增加元素的計數"""
        for d in range(self.depth):
            idx = mmh3.hash(str(item), d) % self.width
            self.table[d][idx] += count
    
    def estimate(self, item):
        """估計元素的出現頻率"""
        estimates = []
        for d in range(self.depth):
            idx = mmh3.hash(str(item), d) % self.width
            estimates.append(self.table[d][idx])
        return min(estimates)  # 取最小值(CMS 永遠不低估)
    
    def merge(self, other):
        """合併兩個 CMS(用於分散式系統)"""
        for d in range(self.depth):
            for w in range(self.width):
                self.table[d][w] += other.table[d][w]

# 測試
cms = CountMinSketch(epsilon=0.01, delta=0.01)
print(f"表格大小: {cms.width}x{cms.depth} = {cms.width * cms.depth} 個計數器")
print(f"實際記憶體: {cms.width * cms.depth * 4 / 1024:.2f} KB (int32)")

# 模擬串流資料
import random
random.seed(42)

true_counts = {}
for _ in range(100000):
    item = f"item_{random.randint(1, 1000)}"
    cms.add(item)
    true_counts[item] = true_counts.get(item, 0) + 1

# 評估誤差
print(f"\n=== 頻率估計評估 ===")
errors = []
for item, true_count in list(true_counts.items())[:20]:
    est = cms.estimate(item)
    error = est - true_count
    errors.append(error)
    print(f"{item}: 實際={true_count}, 估計={est}, 誤差={error}")

print(f"\n平均誤差: {sum(errors)/len(errors):.2f}")
print(f"最大誤差: {max(errors)}")
print(f"理論保證: 誤差 ≤ epsilon * N = {0.01 * 100000} = 1000")

Count-Min Sketch 是什麼?

Count-Min Sketch (CMS) 是一種機率性資料結構,用於在串流資料中估計元素的出現頻率。它使用一個二維計數器陣列和多個獨立的雜湊函數,以極少的記憶體追蹤頻率。

| 特性 | 值 | |------|-----| | 記憶體 | (e/ε) × ln(1/δ) 個計數器 | | 誤差保證 | 機率 1-δ,誤差 ≤ ε×N | | 方向 | 永遠高估(絕不低估) |

運作原理

1. 建立一個二維表格:寬度 = e/ε,深度 = ln(1/δ)
2. 每一行使用一個獨立的 hash 函數
3. 加入元素時:用 d 個 hash 計算 → 對應計數器加 1
4. 估計頻率時:用 d 個 hash 計算 → 取 d 個計數器中的最小值

關鍵洞察:CMS 永遠高估(因為 hash 碰撞)。
取最小值 = 碰撞最少的行 = 最接近真實值的估計。

數學保證

機率 1-δ 時:
  實際頻率 ≤ 估計頻率 ≤ 實際頻率 + ε × N

其中:
  ε = epsilon(精確度參數,例如 0.01)
  δ = delta(信心參數,例如 0.01)
  N = 已處理項目總數

範例:ε=0.01, δ=0.01, N=100000
  寬度 = e/0.01 ≈ 272
  深度 = ln(1/0.01) ≈ 5
  記憶體 = 272×5 = 1,360 個計數器 ≈ 5.3 KB
  誤差保證:≤ 0.01×100000 = 1000

記憶體比較

| 資料結構 | 100K 項目 | 記憶體 | 誤差 | |---------|----------|--------|------| | HashMap | 100K 條目 | ~5 MB | 0% | | CMS (1%誤差, 99%信心) | 任意串流 | ~5 KB | ≤1% | | CMS (0.1%誤差, 99.9%信心) | 任意串流 | ~50 KB | ≤0.1% |

深入應用場景

1. 即時熱門商品榜

電商平台在每秒處理數百萬事件的串流中,使用 CMS 即時統計每個商品的點擊和購買次數。只需約 5 KB 記憶體就能從百萬商品中找出前 100 名熱門商品。典型的用法是 CMS + 最小堆積(min-heap)來追蹤 top-k 項目。

2. DDoS 檢測

CDN 業者使用 CMS 監控每秒來自每個來源 IP 的請求數。當某 IP 的估計請求數在短時間內超過閾值(例如正常值的 10 倍),系統自動觸發速率限制或阻擋。

3. 熱門搜尋詞

搜尋引擎使用 CMS 即時統計查詞頻率。即使每秒有數十萬次查詢,CMS 也能以固定記憶體找出 trending topics。

4. 資料庫查詢最佳化

資料庫在執行 GROUP BYDISTINCT 查詢前,先用 CMS 估計結果大小,決定使用哪種執行策略。

5. IoT 感測器資料

邊緣設備(樹莓派、ESP32)使用 CMS 統計感測器讀數的分布,只上傳 sketch 到雲端而非原始資料。

6. Twitter 即時趨勢

twitter 的早即版本使用 CMS 來找出平台上正在趨勢的主題標籤。

時間複雜度

| 操作 | 時間 | |------|------| | add(item) | O(d) — d 次 hash + d 次遞增 | | estimate(item) | O(d) — d 次 hash + d 次讀取 + 取最小值 | | merge(other) | O(w×d) — 逐格相加 | | 空間 | O(w×d) — 透過 ε 和 δ 設定 |

變體

| 變體 | 特色 | |------|------| | Count-Min Sketch | 基本頻率估計 | | Count-Mean-Min Sketch | 減去其他項目的雜訊 | | Conservative Update | 只在目前是最小值時更新 | | Heavy Hitters | CMS + 追蹤 top-k 項目 |

與其他機率性資料結構對比

| 結構 | 功能 | 記憶體/精度比 | |------|------|-------------| | Bloom Filter | 集合成員測試 | ~1.2 MB/1M (1% FP) | | HyperLogLog | 基數估計 | ~16 KB (2% error) | | Count-Min Sketch | 頻率估計 | ~5 KB/100K (1% error) | | MinHash | 集合相似度 | ~2 KB (5% error) |

總結

  • 頻率估計:在串流資料中即時計算元素出現次數
  • 永遠高估:取多行最小值,絕不低估
  • 可設定精度:ε 控制誤差,δ 控制信心
  • 極省記憶體:1% 誤差只需約 5 KB
  • 分散式友善:可合併多個 CMS
  • 廣泛應用:Apache DataSketches、Redis、Twitter

真實問題:偵測 Twitter 上的熱門話題

想像你正在幫 Twitter 建立即時熱門話題(Trending Topics)系統。每分鐘有 50 萬條推文,你需要即時找出當前討論度最高的 hashtag。

如果用 HashMap 儲存每個 hashtag 的出現次數:

  • 假設每分鐘出現 10 萬個不同的 hashtag
  • HashMap 需要儲存 10 萬個 key-value pair
  • 如果 hashtag 平均長度 15 bytes,約需 3 MB / 分鐘
  • 一天下來:3 MB × 1440 = 4.3 GB(而且都是熱資料,必須存在 RAM 中)

用 Count-Min Sketch:

  • 設定 ε=0.001(誤差 0.1%)、δ=0.01(失敗機率 1%)
  • 表格大小:width=2719, depth=5
  • 總記憶體:2719×5×8 bytes ≈ 109 KB
  • 一天的資料量:109 KB(固定不變!)— 節省 99.997%

這就是為什麼 Twitter、Reddit、Cloudflare 等處理海量串流資料的公司都在用 CMS。

當 CMS 遇上「Heavy Hitters」問題

CMS 最強大的應用之一是解決 Heavy Hitters(頻繁出現的元素)問題:找出所有出現次數超過總數 1% 的元素。

def find_heavy_hitters(stream, threshold_ratio=0.01):
    """在單次遍歷中找出所有超過 threshold 的頻繁元素"""
    cms = CountMinSketch(epsilon=threshold_ratio/2, delta=0.01)
    total = 0
    heavy_hitters = []
    
    for item in stream:
        cms.add(item)
        total += 1
    
    # 第二次遍歷:檢查哪些元素是 heavy hitter
    #(在真實串流中,這可以透過定時報告來做)
    seen = set()
    for item in stream:
        if item not in seen:
            seen.add(item)
            est = cms.estimate(item)
            if est > total * threshold_ratio:
                heavy_hitters.append((item, est))
    
    return sorted(heavy_hitters, key=lambda x: -x[1])

下章預告:從頻率到相似度

Count-Min Sketch 讓我們知道「某個東西出現了幾次」,但很多時候我們想知道的是「哪個東西跟這個最像?」。這就是下一章 FAISS 向量相似搜尋 要解決的問題——用向量嵌入(Embeddings)來計算物品之間的相似度。

從頻率統計到相似度搜尋,這是從「是什麼」到「像什麼」的思維躍進。

解鎖完整教學內容

本章為付費內容。加入專案即可解鎖超過 5000 字的深度解析,包含 10 個以上神級 Prompt 與真實 Source Code 範例!