串流資料處理管線實戰
整合所有機率性資料結構
import random
import time
from collections import defaultdict
class StreamProcessor:
"""整合多種近似演算法的串流處理器"""
def __init__(self):
# 使用之前實作的類別
self.bloom = BloomFilter(1000000, 0.01)
self.hll = HyperLogLog(b=14)
self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
self.processed_count = 0
self.start_time = None
def process_item(self, user_id, action, value=None):
"""處理一條串流資料"""
if self.start_time is None:
self.start_time = time.time()
self.processed_count += 1
# 1. Bloom Filter:檢查是否為新使用者
is_new = not self.bloom.check(user_id)
self.bloom.add(user_id)
# 2. HyperLogLog:統計不重複使用者
self.hll.add(user_id)
# 3. Count-Min Sketch:統計動作頻率
self.cms.add(action)
if value is not None:
self.cms.add(f"value_{action}", value)
return {
"is_new_user": is_new,
"unique_users_est": self.hll.count(),
f"{action}_count_est": self.cms.estimate(action),
}
def get_stats(self):
"""取得處理統計"""
elapsed = time.time() - self.start_time if self.start_time else 0
return {
"processed": self.processed_count,
"elapsed_sec": round(elapsed, 2),
"throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
"unique_users_est": self.hll.count(),
"memory_usage_kb": self._estimate_memory(),
}
def _estimate_memory(self):
# Bloom: 約 1.2 MB (for 1M items, 1% FP)
# HLL: 16 KB
# CMS: ~40 KB
return 1250 # KB
# 模擬電商串流資料
processor = StreamProcessor()
print("=== 即時串流資料處理模擬 ===\n")
# 模擬 10 萬筆串流資料
for i in range(100000):
user_id = f"user_{random.randint(1, 5000)}"
action = random.choice(["view", "click", "add_cart", "purchase"])
result = processor.process_item(user_id, action)
if i % 20000 == 0:
stats = processor.get_stats()
print(f"已處理 {i} 筆...")
print(f" UV 估計: {stats['unique_users_est']:,}")
print(f" 處理速度: {stats['throughput']:,} 筆/秒")
stats = processor.get_stats()
print(f"\n=== 最終統計 ===")
print(f"總處理量: {stats['processed']:,} 筆")
print(f"耗時: {stats['elapsed_sec']} 秒")
print(f"處理速度: {stats['throughput']:,} 筆/秒")
print(f"不重複使用者估計: {stats['unique_users_est']:,}")
print(f"實際不重複使用者: 5,000")
print(f"記憶體使用: ~{stats['memory_usage_kb']} KB")
# 如果使用傳統 HashSet 儲存所有使用者
print(f"\nHashSet 記憶體估計: ~{5000 * 50 / 1024:.1f} KB (假設每個使用者資料 50 bytes)")
print(f"節省比例: ~{stats['memory_usage_kb'] / (5000*50/1024):.1f}x 記憶體")
生產環境考量
吞吐量
在生產環境中,此管線可達到每個核心每秒 100 萬+ 事件。瓶頸通常是 I/O(資料攝取)而非演算法本身。
分散式合併
在分散式系統中,每台伺服器維護自己的 sketches,定期合併到主控:
# 從 10 個 shard 合併 HLL
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
for i in range(master_hll.m):
master_hll.registers[i] = max(master_hll.registers[i], shard_hll.registers[i])
# 從 10 個 shard 合併 CMS
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
master_cms.merge(shard_cms)
視覺化
處理後的資料可饋送至:
- Grafana — 即時儀表板
- Prometheus — 指標儲存 + 警報
- Kafka — 下游處理
記憶體比較:機率性 vs 傳統
| 元件 | 傳統資料結構 | 機率性資料結構 | 節省比例 | |------|-------------|--------------|---------| | UV 統計 | HashSet (5,000 users) ≈ 250 KB | HyperLogLog ≈ 16 KB | 15.6x | | 頻率統計 | HashMap (1000 items) ≈ 48 KB | Count-Min Sketch ≈ 5 KB | 9.6x | | 去重檢查 | HashSet (1M emails) ≈ 50 MB | Bloom Filter ≈ 1.2 MB | 41.7x | | 總計 | ~50 MB | ~1.2 MB | ~40x |
你學到了什麼 🎉
| 章節 | 技能 | 記憶體 | 應用場景 | |------|------|--------|---------| | ✅ 1. Bloom Filter | 節省空間的集合成員檢查 | 1.2 MB/百萬件 | 快取過濾、爬蟲去重 | | ✅ 2. HyperLogLog | 超大基數估計 | 16 KB/數十億 | UV 統計、資料庫最佳化 | | ✅ 3. Count-Min Sketch | 串流頻率估計 | ~5 KB | 熱門排行、DDoS 檢測 | | ✅ 4. FAISS | 向量相似搜尋 | 1 GB/十億向量 | RAG、推薦系統、語意搜尋 | | ✅ 5. 完整管線 | 生產級整合 | ~1.2 MB 總計 | Google Analytics 級分析 |
總結
你現在掌握了處理大規模串流資料的完整工具箱:
- 🔴 Bloom Filter — 某個東西出現過了嗎?(集合成員)
- 🟡 HyperLogLog — 有多少不同的東西?(基數估計)
- 🟢 Count-Min Sketch — 每個東西出現幾次?(頻率估計)
- 🔵 FAISS — 哪個東西最相似?(向量搜尋)
這四種演算法構成了現代大數據與 AI 基礎架構的核心元件。Google、Meta、Twitter、Netflix 都在使用這些技術來處理每秒數 GB 的資料。
近似演算法課程完成!🎉
- ✅ Bloom Filter
- ✅ HyperLogLog
- ✅ Count-Min Sketch
- ✅ FAISS
- ✅ 完整管線
四個工具,一個管線
讓我們回顧這個課程中你已經學會的四個機率性資料結構,以及它們在串流管線中的角色:
原始串流資料(每秒 10 萬筆)
│
▼
┌─────────────────────────────────────────┐
│ Step 1: Bloom Filter(去重/過濾) │
│ 「這個用戶 ID 之前出現過嗎?」 │
│ 記憶體: ~1 MB | 誤判率: 1% │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Step 2: HyperLogLog(基數統計) │
│ 「今天有多少不重複訪客?」 │
│ 記憶體: ~16 KB | 誤差: ~2% │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Step 3: Count-Min Sketch(頻率統計) │
│ 「哪個商品被點擊最多次?」 │
│ 記憶體: ~100 KB | 誤差: <1% │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Step 4: FAISS(相似度搜尋) │
│ 「這個使用者的向量跟誰最像?」 │
│ 記憶體: 可調 | 精確度: 可調 │
└─────────────────────────────────────────┘
│
▼
即時儀表板 / 推薦系統 / 異常告警
真實世界的串流管線
這不是理論——這是 Netflix、Twitter、Cloudflare、Uber 每天都在用的架構。以 Cloudflare 為例:
- 每秒處理 2000 萬個 HTTP 請求
- Bloom Filter 檢查請求 IP 是否在黑名單中
- HyperLogLog 統計不重複來源 IP 數
- Count-Min Sketch 找出最常被存取的 URL
- 全部在幾 KB 的記憶體內完成,零磁碟 I/O
課程總結
| 章節 | 工具 | 它回答的問題 | 記憶體 | |:----:|:----:|:-----------:|:------:| | 1 | Bloom Filter | 這個出現過嗎? | 1 MB / 百萬項目 | | 2 | HyperLogLog | 有多少不同的? | 16 KB | | 3 | Count-Min Sketch | 這個出現了幾次? | 100 KB | | 4 | FAISS | 誰跟這個最像? | 可調 | | 5 | 串流管線 | 全部一起用! | 整合 |
你現在已經掌握了大數據串流處理的核心工具箱。無論是設計即時監控系統、推薦引擎還是異常偵測管線,你都知道該用什麼工具、為什麼用它、以及如何整合它們。