ストリーム処理パイプライン
🔥 Vibe プロンプト
「リアルタイムECストリームプロセッサを構築:10万イベント、UV、ページビュー、人気商品をBloom+HLL+CMSで追跡。」
概要
この章では、3つの確率的データ構造を単一のリアルタイムストリームプロセッサに統合します。本番環境では、このアーキテクチャは毎秒数百万イベントを約1.2 MBの総メモリで処理します。
| コンポーネント | アルゴリズム | メモリ | 目的 |
|------------|-----------|--------|------|
| Bloom Filter | 01-bloom-filter | ~1.2 MB | 新規/再訪問ユーザーの判定 |
| HyperLogLog | 02-hyperloglog | ~16 KB | ユニークビジター数(UV)の推定 |
| Count-Min Sketch | 03-count-min-sketch | ~5 KB | ページビュー、クリック、購入の計数 |
| 合計 | — | ~1.2 MB | 完全なリアルタイム分析! |
アーキテクチャ
イベントストリーム(10万イベント)
↓
[ストリームプロセッサ] — 合計1.2 MB
├── Bloom Filter → 新規/再訪問検出
├── HyperLogLog → ユニークビジター推定
├── Count-Min Sketch → アクション頻度計数
└── ダッシュボード → リアルタイムメトリクス出力
実装
import random
import time
from collections import defaultdict
class StreamProcessor:
"""Bloom + HLL + CMSを統合したストリームプロセッサ"""
def __init__(self):
self.bloom = BloomFilter(1000000, 0.01) # 1.2 MB
self.hll = HyperLogLog(b=14) # 16 KB
self.cms = CountMinSketch(epsilon=0.005, delta=0.01) # ~10 KB
self.processed = 0
self.start_time = None
self.actions = defaultdict(int)
def process_item(self, user_id, action):
if self.start_time is None:
self.start_time = time.time()
self.processed += 1
# 1. Bloom Filter: 新規ユーザー検出
is_new = not self.bloom.check(user_id)
self.bloom.add(user_id)
# 2. HyperLogLog: ユニークビジター数
self.hll.add(user_id)
# 3. Count-Min Sketch: アクション頻度
self.cms.add(action)
self.actions[action] += 1
return {
"is_new_user": is_new,
"unique_visitors": self.hll.count(),
"action_count": self.cms.estimate(action)
}
def get_stats(self):
elapsed = time.time() - self.start_time if self.start_time else 0
return {
"processed": self.processed,
"elapsed": round(elapsed, 2),
"throughput": int(self.processed / elapsed) if elapsed > 0 else 0,
"unique_visitors": self.hll.count(),
"total_memory_kb": 1250
}
# ECストリームのシミュレーション
processor = StreamProcessor()
print("=== リアルタイムストリーム処理 シミュレーション ===\n")
for i in range(100000):
user_id = f"user_{random.randint(1, 5000)}"
action = random.choice(["view", "click", "add_to_cart", "purchase"])
processor.process_item(user_id, action)
if i % 20000 == 0:
stats = processor.get_stats()
print(f"処理済み: {i:,} イベント...")
print(f" UV推定: {stats['unique_visitors']:,}")
print(f" スループット: {stats['throughput']:,} イベント/秒")
# 最終結果
stats = processor.get_stats()
print(f"\n=== 最終結果 ===")
print(f"総処理数: {stats['processed']:,}")
print(f"経過時間: {stats['elapsed']}秒")
print(f"スループット: {stats['throughput']:,} イベント/秒")
print(f"UV推定: {stats['unique_visitors']:,} (実際: 5,000)")
print(f"使用メモリ: ~{stats['total_memory_kb']} KB")
# メモリ比較
print(f"\n=== メモリ比較 ===")
print(f"従来方式 (HashSet): ~{5000*50/1024:.1f} KB")
print(f"確率的 (Bloom+HLL+CMS): ~{stats['total_memory_kb']} KB")
print(f"節約率: {(5000*50/1024)/stats['total_memory_kb']:.1f}x")
# アクション頻度精度
print(f"\n=== アクション頻度精度 ===")
for action in ["view", "click", "add_to_cart", "purchase"]:
est = processor.cms.estimate(action)
true = processor.actions[action]
print(f"{action}: 実際={true}, 推定={est}, 誤差={est-true} ({(est-true)/true*100:.1f}% 過大)")
本番環境での考慮点
スループット
本番では、このパイプラインはコアあたり毎秒100万+イベントを処理可能。ボトルネックは通常スケッチではなくI/O(取り込み)です。
分散マージ
各サーバーが独自のスケッチを維持し、定期的にマスターにマージ:
# 10シャードからHLLをマージ
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
for i in range(master_hll.m):
master_hll.registers[i] = max(master_hll.registers[i], shard_hll.registers[i])
# 10シャードからCMSをマージ
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
master_cms.merge(shard_cms)
可視化
処理データを以下に連携:
- Grafana — リアルタイムダッシュボード
- Prometheus — メトリクス保存 + アラート
- Kafka — ダウンストリーム処理
学習した内容 🎉
| 章 | スキル | メモリ | |----|-------|-------| | ✅ 1. Bloom Filter | セットメンバーシップ検査 | 1.2 MB/100万件 | | ✅ 2. HyperLogLog | カーディナリティ推定 | 16 KB/数十億件 | | ✅ 3. Count-Min Sketch | 頻度推定 | ~5 KB | | ✅ 4. FAISS | ベクトル類似検索 | 1 GB/10億ベクトル | | ✅ 5. 完全パイプライン | プロダクション統合 | ~1.2 MB合計 |
近似アルゴリズムコース完了!🎉
- ✅ Bloom Filter
- ✅ HyperLogLog
- ✅ Count-Min Sketch
- ✅ FAISS
- ✅ 完全パイプライン