串流資料處理管線實戰

整合所有機率性資料結構

import random
import time
from collections import defaultdict

class StreamProcessor:
    """整合多種近似演算法的串流處理器"""
    
    def __init__(self):
        # 使用之前實作的類別
        self.bloom = BloomFilter(1000000, 0.01)
        self.hll = HyperLogLog(b=14)
        self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
        
        self.processed_count = 0
        self.start_time = None
    
    def process_item(self, user_id, action, value=None):
        """處理一條串流資料"""
        if self.start_time is None:
            self.start_time = time.time()
        
        self.processed_count += 1
        
        # 1. Bloom Filter:檢查是否為新使用者
        is_new = not self.bloom.check(user_id)
        self.bloom.add(user_id)
        
        # 2. HyperLogLog:統計不重複使用者
        self.hll.add(user_id)
        
        # 3. Count-Min Sketch:統計動作頻率
        self.cms.add(action)
        if value is not None:
            self.cms.add(f"value_{action}", value)
        
        return {
            "is_new_user": is_new,
            "unique_users_est": self.hll.count(),
            f"{action}_count_est": self.cms.estimate(action),
        }
    
    def get_stats(self):
        """取得處理統計"""
        elapsed = time.time() - self.start_time if self.start_time else 0
        return {
            "processed": self.processed_count,
            "elapsed_sec": round(elapsed, 2),
            "throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
            "unique_users_est": self.hll.count(),
            "memory_usage_kb": self._estimate_memory(),
        }
    
    def _estimate_memory(self):
        # Bloom: 約 1.2 MB (for 1M items, 1% FP)
        # HLL: 16 KB
        # CMS: ~40 KB
        return 1250  # KB

# 模擬電商串流資料
processor = StreamProcessor()

print("=== 即時串流資料處理模擬 ===\n")

# 模擬 10 萬筆串流資料
for i in range(100000):
    user_id = f"user_{random.randint(1, 5000)}"
    action = random.choice(["view", "click", "add_cart", "purchase"])
    
    result = processor.process_item(user_id, action)
    
    if i % 20000 == 0:
        stats = processor.get_stats()
        print(f"已處理 {i} 筆...")
        print(f"  UV 估計: {stats['unique_users_est']:,}")
        print(f"  處理速度: {stats['throughput']:,} 筆/秒")

stats = processor.get_stats()
print(f"\n=== 最終統計 ===")
print(f"總處理量: {stats['processed']:,} 筆")
print(f"耗時: {stats['elapsed_sec']} 秒")
print(f"處理速度: {stats['throughput']:,} 筆/秒")
print(f"不重複使用者估計: {stats['unique_users_est']:,}")
print(f"實際不重複使用者: 5,000")
print(f"記憶體使用: ~{stats['memory_usage_kb']} KB")

# 如果使用傳統 HashSet 儲存所有使用者
print(f"\nHashSet 記憶體估計: ~{5000 * 50 / 1024:.1f} KB (假設每個使用者資料 50 bytes)")
print(f"節省比例: ~{stats['memory_usage_kb'] / (5000*50/1024):.1f}x 記憶體")

恭喜你完成了整個近似演算法與大數據串流課程!你現在具備了處理大規模資料串流的完整工具箱,能夠在極少的記憶體下完成 UV 統計、頻率估計、去重與相似搜尋。

解鎖完整教學內容

本章為付費內容。加入專案即可解鎖超過 5000 字的深度解析,包含 10 個以上神級 Prompt 與真實 Source Code 範例!