Stream Processing Pipeline: All Tools Together

In the previous chapters, you learned four powerful tools individually. This chapter integrates them into a complete stream processing pipeline that handles millions of events per second.

Why a Pipeline?

A single tool solves one problem. A pipeline solves the complete analytics challenge:

Raw Stream Data (100K events/sec)
    |
    v
Step 1: Bloom Filter - "Have we seen this user before?"
    |
    v
Step 2: HyperLogLog - "How many unique users?"
    |
    v
Step 3: Count-Min Sketch - "How many times did each action occur?"
    |
    v
Step 4: FAISS - "Which items are most similar to this user's profile?"
    |
    v
Real-Time Dashboard / Recommendation Engine / Alert System

The Complete Pipeline Implementation

import random
import time
from collections import defaultdict

class StreamProcessor:
    """Complete stream processor integrating all probabilistic data structures"""
    
    def __init__(self):
        # Bloom Filter: 1M items, 1% FP rate -> ~1.2 MB
        self.bloom = BloomFilter(1_000_000, 0.01)
        # HyperLogLog: b=14 -> 16 KB, ~0.81% error
        self.hll = HyperLogLog(b=14)
        # Count-Min Sketch: 1% error, 99% confidence -> ~5 KB
        self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
        
        self.processed_count = 0
        self.start_time = None
    
    def process_item(self, user_id: str, action: str, value: float = None) -> dict:
        """Process a single stream data item"""
        if self.start_time is None:
            self.start_time = time.time()
        
        self.processed_count += 1
        
        # Step 1: Bloom Filter - new or returning user?
        is_new = not self.bloom.check(user_id)
        self.bloom.add(user_id)
        
        # Step 2: HyperLogLog - count unique users
        self.hll.add(user_id)
        
        # Step 3: Count-Min Sketch - count action frequency
        self.cms.add(action)
        if value is not None:
            self.cms.add(f"value_{action}", int(value))
        
        return {
            "is_new_user": is_new,
            "unique_users_est": self.hll.count(),
            f"{action}_count_est": self.cms.estimate(action),
        }
    
    def get_stats(self) -> dict:
        """Get current processing statistics"""
        elapsed = time.time() - self.start_time if self.start_time else 0
        return {
            "processed": self.processed_count,
            "elapsed_sec": round(elapsed, 2),
            "throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
            "unique_users_est": self.hll.count(),
            "memory_usage_kb": self._estimate_memory(),
        }
    
    def _estimate_memory(self) -> int:
        # Bloom: ~1.2 MB for 1M items, 1% FP
        # HLL: 16 KB for b=14
        # CMS: ~40 KB for epsilon=0.005
        return 1256  # KB total

# Simulate 100K stream events
processor = StreamProcessor()
print("=== Real-Time Stream Processing Simulation ===")
print()

for i in range(100_000):
    user_id = f"user_{random.randint(1, 5000)}"
    action = random.choice(["view", "click", "add_cart", "purchase"])
    processor.process_item(user_id, action)
    
    if i % 20_000 == 0:
        stats = processor.get_stats()
        print(f"Processed {i:,} records...")
        print(f"  UV estimate: {stats['unique_users_est']:,}")
        print(f"  Throughput: {stats['throughput']:,} records/sec")

stats = processor.get_stats()
print(f"\n=== Final Statistics ===")
print(f"Total processed: {stats['processed']:,}")
print(f"Elapsed: {stats['elapsed_sec']} sec")
print(f"Throughput: {stats['throughput']:,} records/sec")
print(f"Unique users (est): {stats['unique_users_est']:,}")
print(f"Actual unique users: 5,000")
print(f"Memory usage: ~{stats['memory_usage_kb']} KB")

# Compare with traditional approach
print(f"\nHashSet memory (est): ~{5000 * 50 / 1024:.1f} KB")
print(f"Memory savings: ~{stats['memory_usage_kb'] / (5000 * 50 / 1024):.1f}x")

Production Considerations

Throughput Optimization

The pipeline can process over 1 million events per second per core. The bottleneck is usually data ingestion (network I/O), not the algorithms themselves.

Distributed Deployment

In production, each server maintains its own sketches. Periodically merge to the master node:

# Merge HLL from 10 shards
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
    for i in range(master_hll.m):
        master_hll.registers[i] = max(
            master_hll.registers[i],
            shard_hll.registers[i]
        )

# Merge CMS from 10 shards
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
    master_cms.merge(shard_cms)

Visualization & Alerting

Processed data feeds into:

  • Grafana: Real-time dashboards for UV, trending products, error rates
  • Prometheus: Metrics storage with alert rules for anomaly detection
  • Kafka: Downstream processing for recommendation engine

Memory Comparison: Probabilistic vs Traditional

| Component | Traditional Approach | Memory | Probabilistic Approach | Memory | Savings | |-----------|:-------------------:|:------:|:---------------------:|:------:|:-------:| | UV Stats | HashSet (5K users) | ~250 KB | HyperLogLog | 16 KB | 15.6x | | Frequency | HashMap (1K items) | ~48 KB | Count-Min Sketch | 5 KB | 9.6x | | Dedup Check | HashSet (1M emails) | ~50 MB | Bloom Filter | 1.2 MB | 41.7x | | Total | | ~50 MB | | ~1.2 MB | ~40x |

Course Summary

| Chapter | Tool | Memory | Application | |:-------:|:----:|:------:|-------------| | 1 | Bloom Filter | 1.2 MB / 1M items | Set membership, cache filtering, crawl dedup | | 2 | HyperLogLog | 16 KB (b=14) | Cardinality estimation, UV stats, DB optimization | | 3 | Count-Min Sketch | ~5 KB | Frequency estimation, trending topics, DDoS detection | | 4 | FAISS | 1 GB / 1B vectors | Vector similarity, recommendations, RAG, semantic search | | 5 | Full Pipeline | ~1.2 MB total | Integrated real-time analytics |

These four probabilistic data structures form the core of modern big data and AI infrastructure. Google, Meta, Twitter, Netflix, and Cloudflare all use these techniques to process petabytes of data daily.

The core philosophy: trade a tiny amount of accuracy for massive memory and speed improvements.

Real-World Pipeline: E-Commerce Analytics

An e-commerce platform's real-time analytics pipeline processes every user interaction:

User visits homepage
  -> Bloom Filter: Is this a new visitor? (shown "Welcome!" banner)
  -> HyperLogLog: Update today's UV count
  -> Count-Min Sketch: Increment "homepage_view" counter

User searches "camping tent"
  -> Count-Min Sketch: Increment "search:camping tent" counter
  -> FAISS: Find 10 most similar products to search query embedding

User views product page
  -> Count-Min Sketch: Increment "product_view:shark_123" counter
  -> HyperLogLog: Count distinct product views today

User adds to cart
  -> Count-Min Sketch: Increment "add_to_cart" counter
  -> Bloom Filter: Is this user's first purchase? (offer discount)

User completes purchase
  -> Count-Min Sketch: Increment "purchase" counter
  -> HyperLogLog: Count unique purchasers today
  -> Bloom Filter: Mark user as "has_purchased"

All of this happens in real-time with approximately 1.2 MB of memory, processing millions of events per second.

Scaling to Billions of Events

For truly massive scale (Cloudflare, Twitter, Netflix):

  • Shard by data center: Each location runs its own pipeline
  • Periodic merge: Sketches merged to global master every minute
  • Layered aggregation: Per-second -> Per-minute -> Per-hour -> Per-day
  • Downsampling: Older data kept at lower resolution

Key Takeaways

  • A complete stream processing pipeline integrates Bloom Filter, HyperLogLog, Count-Min Sketch, and optionally FAISS
  • Total memory for all four tools is approximately 1.2 MB
  • Processing throughput exceeds 1M events/second/core
  • Sketches are mergeable for distributed deployment
  • These four algorithms form the foundation of modern big data infrastructure at Google, Meta, Twitter, and Netflix

Unlock Full Tutorial

This chapter is paid content. Join the project to unlock over 5000 words of deep analysis, including 10+ god-tier Prompts and real Source Code examples!