串流資料處理管線實戰
整合所有機率性資料結構
import random
import time
from collections import defaultdict
class StreamProcessor:
"""整合多種近似演算法的串流處理器"""
def __init__(self):
# 使用之前實作的類別
self.bloom = BloomFilter(1000000, 0.01)
self.hll = HyperLogLog(b=14)
self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
self.processed_count = 0
self.start_time = None
def process_item(self, user_id, action, value=None):
"""處理一條串流資料"""
if self.start_time is None:
self.start_time = time.time()
self.processed_count += 1
# 1. Bloom Filter:檢查是否為新使用者
is_new = not self.bloom.check(user_id)
self.bloom.add(user_id)
# 2. HyperLogLog:統計不重複使用者
self.hll.add(user_id)
# 3. Count-Min Sketch:統計動作頻率
self.cms.add(action)
if value is not None:
self.cms.add(f"value_{action}", value)
return {
"is_new_user": is_new,
"unique_users_est": self.hll.count(),
f"{action}_count_est": self.cms.estimate(action),
}
def get_stats(self):
"""取得處理統計"""
elapsed = time.time() - self.start_time if self.start_time else 0
return {
"processed": self.processed_count,
"elapsed_sec": round(elapsed, 2),
"throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
"unique_users_est": self.hll.count(),
"memory_usage_kb": self._estimate_memory(),
}
def _estimate_memory(self):
# Bloom: 約 1.2 MB (for 1M items, 1% FP)
# HLL: 16 KB
# CMS: ~40 KB
return 1250 # KB
# 模擬電商串流資料
processor = StreamProcessor()
print("=== 即時串流資料處理模擬 ===\n")
# 模擬 10 萬筆串流資料
for i in range(100000):
user_id = f"user_{random.randint(1, 5000)}"
action = random.choice(["view", "click", "add_cart", "purchase"])
result = processor.process_item(user_id, action)
if i % 20000 == 0:
stats = processor.get_stats()
print(f"已處理 {i} 筆...")
print(f" UV 估計: {stats['unique_users_est']:,}")
print(f" 處理速度: {stats['throughput']:,} 筆/秒")
stats = processor.get_stats()
print(f"\n=== 最終統計 ===")
print(f"總處理量: {stats['processed']:,} 筆")
print(f"耗時: {stats['elapsed_sec']} 秒")
print(f"處理速度: {stats['throughput']:,} 筆/秒")
print(f"不重複使用者估計: {stats['unique_users_est']:,}")
print(f"實際不重複使用者: 5,000")
print(f"記憶體使用: ~{stats['memory_usage_kb']} KB")
# 如果使用傳統 HashSet 儲存所有使用者
print(f"\nHashSet 記憶體估計: ~{5000 * 50 / 1024:.1f} KB (假設每個使用者資料 50 bytes)")
print(f"節省比例: ~{stats['memory_usage_kb'] / (5000*50/1024):.1f}x 記憶體")
恭喜你完成了整個近似演算法與大數據串流課程!你現在具備了處理大規模資料串流的完整工具箱,能夠在極少的記憶體下完成 UV 統計、頻率估計、去重與相似搜尋。