ストリーム処理パイプライン

🔥 Vibe プロンプト

「リアルタイムECストリームプロセッサを構築:10万イベント、UV、ページビュー、人気商品をBloom+HLL+CMSで追跡。」

概要

この章では、3つの確率的データ構造を単一のリアルタイムストリームプロセッサに統合します。本番環境では、このアーキテクチャは毎秒数百万イベントを約1.2 MBの総メモリで処理します。

| コンポーネント | アルゴリズム | メモリ | 目的 | |------------|-----------|--------|------| | Bloom Filter | 01-bloom-filter | ~1.2 MB | 新規/再訪問ユーザーの判定 | | HyperLogLog | 02-hyperloglog | ~16 KB | ユニークビジター数(UV)の推定 | | Count-Min Sketch | 03-count-min-sketch | ~5 KB | ページビュー、クリック、購入の計数 | | 合計 | — | ~1.2 MB | 完全なリアルタイム分析! |

アーキテクチャ

イベントストリーム(10万イベント)
    ↓
[ストリームプロセッサ] — 合計1.2 MB
    ├── Bloom Filter → 新規/再訪問検出
    ├── HyperLogLog  → ユニークビジター推定
    ├── Count-Min Sketch → アクション頻度計数
    └── ダッシュボード → リアルタイムメトリクス出力

実装

import random
import time
from collections import defaultdict

class StreamProcessor:
    """Bloom + HLL + CMSを統合したストリームプロセッサ"""
    
    def __init__(self):
        self.bloom = BloomFilter(1000000, 0.01)   # 1.2 MB
        self.hll = HyperLogLog(b=14)               # 16 KB
        self.cms = CountMinSketch(epsilon=0.005, delta=0.01)  # ~10 KB
        
        self.processed = 0
        self.start_time = None
        self.actions = defaultdict(int)
    
    def process_item(self, user_id, action):
        if self.start_time is None:
            self.start_time = time.time()
        self.processed += 1
        
        # 1. Bloom Filter: 新規ユーザー検出
        is_new = not self.bloom.check(user_id)
        self.bloom.add(user_id)
        
        # 2. HyperLogLog: ユニークビジター数
        self.hll.add(user_id)
        
        # 3. Count-Min Sketch: アクション頻度
        self.cms.add(action)
        self.actions[action] += 1
        
        return {
            "is_new_user": is_new,
            "unique_visitors": self.hll.count(),
            "action_count": self.cms.estimate(action)
        }
    
    def get_stats(self):
        elapsed = time.time() - self.start_time if self.start_time else 0
        return {
            "processed": self.processed,
            "elapsed": round(elapsed, 2),
            "throughput": int(self.processed / elapsed) if elapsed > 0 else 0,
            "unique_visitors": self.hll.count(),
            "total_memory_kb": 1250
        }

# ECストリームのシミュレーション
processor = StreamProcessor()

print("=== リアルタイムストリーム処理 シミュレーション ===\n")

for i in range(100000):
    user_id = f"user_{random.randint(1, 5000)}"
    action = random.choice(["view", "click", "add_to_cart", "purchase"])
    processor.process_item(user_id, action)
    
    if i % 20000 == 0:
        stats = processor.get_stats()
        print(f"処理済み: {i:,} イベント...")
        print(f"  UV推定: {stats['unique_visitors']:,}")
        print(f"  スループット: {stats['throughput']:,} イベント/秒")

# 最終結果
stats = processor.get_stats()
print(f"\n=== 最終結果 ===")
print(f"総処理数: {stats['processed']:,}")
print(f"経過時間: {stats['elapsed']}秒")
print(f"スループット: {stats['throughput']:,} イベント/秒")
print(f"UV推定: {stats['unique_visitors']:,} (実際: 5,000)")
print(f"使用メモリ: ~{stats['total_memory_kb']} KB")

# メモリ比較
print(f"\n=== メモリ比較 ===")
print(f"従来方式 (HashSet): ~{5000*50/1024:.1f} KB")
print(f"確率的 (Bloom+HLL+CMS): ~{stats['total_memory_kb']} KB")
print(f"節約率: {(5000*50/1024)/stats['total_memory_kb']:.1f}x")

# アクション頻度精度
print(f"\n=== アクション頻度精度 ===")
for action in ["view", "click", "add_to_cart", "purchase"]:
    est = processor.cms.estimate(action)
    true = processor.actions[action]
    print(f"{action}: 実際={true}, 推定={est}, 誤差={est-true} ({(est-true)/true*100:.1f}% 過大)")

本番環境での考慮点

スループット

本番では、このパイプラインはコアあたり毎秒100万+イベントを処理可能。ボトルネックは通常スケッチではなくI/O(取り込み)です。

分散マージ

各サーバーが独自のスケッチを維持し、定期的にマスターにマージ:

# 10シャードからHLLをマージ
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
    for i in range(master_hll.m):
        master_hll.registers[i] = max(master_hll.registers[i], shard_hll.registers[i])

# 10シャードからCMSをマージ
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
    master_cms.merge(shard_cms)

可視化

処理データを以下に連携:

  • Grafana — リアルタイムダッシュボード
  • Prometheus — メトリクス保存 + アラート
  • Kafka — ダウンストリーム処理

学習した内容 🎉

| 章 | スキル | メモリ | |----|-------|-------| | ✅ 1. Bloom Filter | セットメンバーシップ検査 | 1.2 MB/100万件 | | ✅ 2. HyperLogLog | カーディナリティ推定 | 16 KB/数十億件 | | ✅ 3. Count-Min Sketch | 頻度推定 | ~5 KB | | ✅ 4. FAISS | ベクトル類似検索 | 1 GB/10億ベクトル | | ✅ 5. 完全パイプライン | プロダクション統合 | ~1.2 MB合計 |

近似アルゴリズムコース完了!🎉

  • ✅ Bloom Filter
  • ✅ HyperLogLog
  • ✅ Count-Min Sketch
  • ✅ FAISS
  • ✅ 完全パイプライン

完全なチュートリアルをロック解除

このチャプターは有料コンテンツです。プロジェクトに参加して、10以上の神レベルのPromptや実際のソースコード例を含む、5000字以上の深い分析をロック解除してください!