串流資料處理管線實戰

整合所有機率性資料結構

import random
import time
from collections import defaultdict

class StreamProcessor:
    """整合多種近似演算法的串流處理器"""
    
    def __init__(self):
        # 使用之前實作的類別
        self.bloom = BloomFilter(1000000, 0.01)
        self.hll = HyperLogLog(b=14)
        self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
        
        self.processed_count = 0
        self.start_time = None
    
    def process_item(self, user_id, action, value=None):
        """處理一條串流資料"""
        if self.start_time is None:
            self.start_time = time.time()
        
        self.processed_count += 1
        
        # 1. Bloom Filter:檢查是否為新使用者
        is_new = not self.bloom.check(user_id)
        self.bloom.add(user_id)
        
        # 2. HyperLogLog:統計不重複使用者
        self.hll.add(user_id)
        
        # 3. Count-Min Sketch:統計動作頻率
        self.cms.add(action)
        if value is not None:
            self.cms.add(f"value_{action}", value)
        
        return {
            "is_new_user": is_new,
            "unique_users_est": self.hll.count(),
            f"{action}_count_est": self.cms.estimate(action),
        }
    
    def get_stats(self):
        """取得處理統計"""
        elapsed = time.time() - self.start_time if self.start_time else 0
        return {
            "processed": self.processed_count,
            "elapsed_sec": round(elapsed, 2),
            "throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
            "unique_users_est": self.hll.count(),
            "memory_usage_kb": self._estimate_memory(),
        }
    
    def _estimate_memory(self):
        # Bloom: 約 1.2 MB (for 1M items, 1% FP)
        # HLL: 16 KB
        # CMS: ~40 KB
        return 1250  # KB

# 模擬電商串流資料
processor = StreamProcessor()

print("=== 即時串流資料處理模擬 ===\n")

# 模擬 10 萬筆串流資料
for i in range(100000):
    user_id = f"user_{random.randint(1, 5000)}"
    action = random.choice(["view", "click", "add_cart", "purchase"])
    
    result = processor.process_item(user_id, action)
    
    if i % 20000 == 0:
        stats = processor.get_stats()
        print(f"已處理 {i} 筆...")
        print(f"  UV 估計: {stats['unique_users_est']:,}")
        print(f"  處理速度: {stats['throughput']:,} 筆/秒")

stats = processor.get_stats()
print(f"\n=== 最終統計 ===")
print(f"總處理量: {stats['processed']:,} 筆")
print(f"耗時: {stats['elapsed_sec']} 秒")
print(f"處理速度: {stats['throughput']:,} 筆/秒")
print(f"不重複使用者估計: {stats['unique_users_est']:,}")
print(f"實際不重複使用者: 5,000")
print(f"記憶體使用: ~{stats['memory_usage_kb']} KB")

# 如果使用傳統 HashSet 儲存所有使用者
print(f"\nHashSet 記憶體估計: ~{5000 * 50 / 1024:.1f} KB (假設每個使用者資料 50 bytes)")
print(f"節省比例: ~{stats['memory_usage_kb'] / (5000*50/1024):.1f}x 記憶體")

生產環境考量

吞吐量

在生產環境中,此管線可達到每個核心每秒 100 萬+ 事件。瓶頸通常是 I/O(資料攝取)而非演算法本身。

分散式合併

在分散式系統中,每台伺服器維護自己的 sketches,定期合併到主控:

# 從 10 個 shard 合併 HLL
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
    for i in range(master_hll.m):
        master_hll.registers[i] = max(master_hll.registers[i], shard_hll.registers[i])

# 從 10 個 shard 合併 CMS
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
    master_cms.merge(shard_cms)

視覺化

處理後的資料可饋送至:

  • Grafana — 即時儀表板
  • Prometheus — 指標儲存 + 警報
  • Kafka — 下游處理

記憶體比較:機率性 vs 傳統

| 元件 | 傳統資料結構 | 機率性資料結構 | 節省比例 | |------|-------------|--------------|---------| | UV 統計 | HashSet (5,000 users) ≈ 250 KB | HyperLogLog ≈ 16 KB | 15.6x | | 頻率統計 | HashMap (1000 items) ≈ 48 KB | Count-Min Sketch ≈ 5 KB | 9.6x | | 去重檢查 | HashSet (1M emails) ≈ 50 MB | Bloom Filter ≈ 1.2 MB | 41.7x | | 總計 | ~50 MB | ~1.2 MB | ~40x |

你學到了什麼 🎉

| 章節 | 技能 | 記憶體 | 應用場景 | |------|------|--------|---------| | ✅ 1. Bloom Filter | 節省空間的集合成員檢查 | 1.2 MB/百萬件 | 快取過濾、爬蟲去重 | | ✅ 2. HyperLogLog | 超大基數估計 | 16 KB/數十億 | UV 統計、資料庫最佳化 | | ✅ 3. Count-Min Sketch | 串流頻率估計 | ~5 KB | 熱門排行、DDoS 檢測 | | ✅ 4. FAISS | 向量相似搜尋 | 1 GB/十億向量 | RAG、推薦系統、語意搜尋 | | ✅ 5. 完整管線 | 生產級整合 | ~1.2 MB 總計 | Google Analytics 級分析 |

總結

你現在掌握了處理大規模串流資料的完整工具箱:

  • 🔴 Bloom Filter — 某個東西出現過了嗎?(集合成員)
  • 🟡 HyperLogLog — 有多少不同的東西?(基數估計)
  • 🟢 Count-Min Sketch — 每個東西出現幾次?(頻率估計)
  • 🔵 FAISS — 哪個東西最相似?(向量搜尋)

這四種演算法構成了現代大數據與 AI 基礎架構的核心元件。Google、Meta、Twitter、Netflix 都在使用這些技術來處理每秒數 GB 的資料。

近似演算法課程完成!🎉

  • ✅ Bloom Filter
  • ✅ HyperLogLog
  • ✅ Count-Min Sketch
  • ✅ FAISS
  • ✅ 完整管線

四個工具,一個管線

讓我們回顧這個課程中你已經學會的四個機率性資料結構,以及它們在串流管線中的角色:

原始串流資料(每秒 10 萬筆)
    │
    ▼
┌─────────────────────────────────────────┐
│  Step 1: Bloom Filter(去重/過濾)       │
│  「這個用戶 ID 之前出現過嗎?」           │
│  記憶體: ~1 MB | 誤判率: 1%             │
└─────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────┐
│  Step 2: HyperLogLog(基數統計)         │
│  「今天有多少不重複訪客?」              │
│  記憶體: ~16 KB | 誤差: ~2%             │
└─────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────┐
│  Step 3: Count-Min Sketch(頻率統計)    │
│  「哪個商品被點擊最多次?」              │
│  記憶體: ~100 KB | 誤差: <1%            │
└─────────────────────────────────────────┘
    │
    ▼
┌─────────────────────────────────────────┐
│  Step 4: FAISS(相似度搜尋)            │
│  「這個使用者的向量跟誰最像?」          │
│  記憶體: 可調 | 精確度: 可調            │
└─────────────────────────────────────────┘
    │
    ▼
即時儀表板 / 推薦系統 / 異常告警

真實世界的串流管線

這不是理論——這是 Netflix、Twitter、Cloudflare、Uber 每天都在用的架構。以 Cloudflare 為例:

  • 每秒處理 2000 萬個 HTTP 請求
  • Bloom Filter 檢查請求 IP 是否在黑名單中
  • HyperLogLog 統計不重複來源 IP 數
  • Count-Min Sketch 找出最常被存取的 URL
  • 全部在幾 KB 的記憶體內完成,零磁碟 I/O

課程總結

| 章節 | 工具 | 它回答的問題 | 記憶體 | |:----:|:----:|:-----------:|:------:| | 1 | Bloom Filter | 這個出現過嗎? | 1 MB / 百萬項目 | | 2 | HyperLogLog | 有多少不同的? | 16 KB | | 3 | Count-Min Sketch | 這個出現了幾次? | 100 KB | | 4 | FAISS | 誰跟這個最像? | 可調 | | 5 | 串流管線 | 全部一起用! | 整合 |

你現在已經掌握了大數據串流處理的核心工具箱。無論是設計即時監控系統、推薦引擎還是異常偵測管線,你都知道該用什麼工具、為什麼用它、以及如何整合它們。

解鎖完整教學內容

本章為付費內容。加入專案即可解鎖超過 5000 字的深度解析,包含 10 個以上神級 Prompt 與真實 Source Code 範例!