Stream Processing Pipeline: All Tools Together
In the previous chapters, you learned four powerful tools individually. This chapter integrates them into a complete stream processing pipeline that handles millions of events per second.
Why a Pipeline?
A single tool solves one problem. A pipeline solves the complete analytics challenge:
Raw Stream Data (100K events/sec)
|
v
Step 1: Bloom Filter - "Have we seen this user before?"
|
v
Step 2: HyperLogLog - "How many unique users?"
|
v
Step 3: Count-Min Sketch - "How many times did each action occur?"
|
v
Step 4: FAISS - "Which items are most similar to this user's profile?"
|
v
Real-Time Dashboard / Recommendation Engine / Alert System
The Complete Pipeline Implementation
import random
import time
from collections import defaultdict
class StreamProcessor:
"""Complete stream processor integrating all probabilistic data structures"""
def __init__(self):
# Bloom Filter: 1M items, 1% FP rate -> ~1.2 MB
self.bloom = BloomFilter(1_000_000, 0.01)
# HyperLogLog: b=14 -> 16 KB, ~0.81% error
self.hll = HyperLogLog(b=14)
# Count-Min Sketch: 1% error, 99% confidence -> ~5 KB
self.cms = CountMinSketch(epsilon=0.005, delta=0.01)
self.processed_count = 0
self.start_time = None
def process_item(self, user_id: str, action: str, value: float = None) -> dict:
"""Process a single stream data item"""
if self.start_time is None:
self.start_time = time.time()
self.processed_count += 1
# Step 1: Bloom Filter - new or returning user?
is_new = not self.bloom.check(user_id)
self.bloom.add(user_id)
# Step 2: HyperLogLog - count unique users
self.hll.add(user_id)
# Step 3: Count-Min Sketch - count action frequency
self.cms.add(action)
if value is not None:
self.cms.add(f"value_{action}", int(value))
return {
"is_new_user": is_new,
"unique_users_est": self.hll.count(),
f"{action}_count_est": self.cms.estimate(action),
}
def get_stats(self) -> dict:
"""Get current processing statistics"""
elapsed = time.time() - self.start_time if self.start_time else 0
return {
"processed": self.processed_count,
"elapsed_sec": round(elapsed, 2),
"throughput": int(self.processed_count / elapsed) if elapsed > 0 else 0,
"unique_users_est": self.hll.count(),
"memory_usage_kb": self._estimate_memory(),
}
def _estimate_memory(self) -> int:
# Bloom: ~1.2 MB for 1M items, 1% FP
# HLL: 16 KB for b=14
# CMS: ~40 KB for epsilon=0.005
return 1256 # KB total
# Simulate 100K stream events
processor = StreamProcessor()
print("=== Real-Time Stream Processing Simulation ===")
print()
for i in range(100_000):
user_id = f"user_{random.randint(1, 5000)}"
action = random.choice(["view", "click", "add_cart", "purchase"])
processor.process_item(user_id, action)
if i % 20_000 == 0:
stats = processor.get_stats()
print(f"Processed {i:,} records...")
print(f" UV estimate: {stats['unique_users_est']:,}")
print(f" Throughput: {stats['throughput']:,} records/sec")
stats = processor.get_stats()
print(f"\n=== Final Statistics ===")
print(f"Total processed: {stats['processed']:,}")
print(f"Elapsed: {stats['elapsed_sec']} sec")
print(f"Throughput: {stats['throughput']:,} records/sec")
print(f"Unique users (est): {stats['unique_users_est']:,}")
print(f"Actual unique users: 5,000")
print(f"Memory usage: ~{stats['memory_usage_kb']} KB")
# Compare with traditional approach
print(f"\nHashSet memory (est): ~{5000 * 50 / 1024:.1f} KB")
print(f"Memory savings: ~{stats['memory_usage_kb'] / (5000 * 50 / 1024):.1f}x")
Production Considerations
Throughput Optimization
The pipeline can process over 1 million events per second per core. The bottleneck is usually data ingestion (network I/O), not the algorithms themselves.
Distributed Deployment
In production, each server maintains its own sketches. Periodically merge to the master node:
# Merge HLL from 10 shards
master_hll = HyperLogLog(b=14)
for shard_hll in shard_hlls:
for i in range(master_hll.m):
master_hll.registers[i] = max(
master_hll.registers[i],
shard_hll.registers[i]
)
# Merge CMS from 10 shards
master_cms = CountMinSketch(epsilon=0.005, delta=0.01)
for shard_cms in shard_cmss:
master_cms.merge(shard_cms)
Visualization & Alerting
Processed data feeds into:
- Grafana: Real-time dashboards for UV, trending products, error rates
- Prometheus: Metrics storage with alert rules for anomaly detection
- Kafka: Downstream processing for recommendation engine
Memory Comparison: Probabilistic vs Traditional
| Component | Traditional Approach | Memory | Probabilistic Approach | Memory | Savings | |-----------|:-------------------:|:------:|:---------------------:|:------:|:-------:| | UV Stats | HashSet (5K users) | ~250 KB | HyperLogLog | 16 KB | 15.6x | | Frequency | HashMap (1K items) | ~48 KB | Count-Min Sketch | 5 KB | 9.6x | | Dedup Check | HashSet (1M emails) | ~50 MB | Bloom Filter | 1.2 MB | 41.7x | | Total | | ~50 MB | | ~1.2 MB | ~40x |
Course Summary
| Chapter | Tool | Memory | Application | |:-------:|:----:|:------:|-------------| | 1 | Bloom Filter | 1.2 MB / 1M items | Set membership, cache filtering, crawl dedup | | 2 | HyperLogLog | 16 KB (b=14) | Cardinality estimation, UV stats, DB optimization | | 3 | Count-Min Sketch | ~5 KB | Frequency estimation, trending topics, DDoS detection | | 4 | FAISS | 1 GB / 1B vectors | Vector similarity, recommendations, RAG, semantic search | | 5 | Full Pipeline | ~1.2 MB total | Integrated real-time analytics |
These four probabilistic data structures form the core of modern big data and AI infrastructure. Google, Meta, Twitter, Netflix, and Cloudflare all use these techniques to process petabytes of data daily.
The core philosophy: trade a tiny amount of accuracy for massive memory and speed improvements.
Real-World Pipeline: E-Commerce Analytics
An e-commerce platform's real-time analytics pipeline processes every user interaction:
User visits homepage
-> Bloom Filter: Is this a new visitor? (shown "Welcome!" banner)
-> HyperLogLog: Update today's UV count
-> Count-Min Sketch: Increment "homepage_view" counter
User searches "camping tent"
-> Count-Min Sketch: Increment "search:camping tent" counter
-> FAISS: Find 10 most similar products to search query embedding
User views product page
-> Count-Min Sketch: Increment "product_view:shark_123" counter
-> HyperLogLog: Count distinct product views today
User adds to cart
-> Count-Min Sketch: Increment "add_to_cart" counter
-> Bloom Filter: Is this user's first purchase? (offer discount)
User completes purchase
-> Count-Min Sketch: Increment "purchase" counter
-> HyperLogLog: Count unique purchasers today
-> Bloom Filter: Mark user as "has_purchased"
All of this happens in real-time with approximately 1.2 MB of memory, processing millions of events per second.
Scaling to Billions of Events
For truly massive scale (Cloudflare, Twitter, Netflix):
- Shard by data center: Each location runs its own pipeline
- Periodic merge: Sketches merged to global master every minute
- Layered aggregation: Per-second -> Per-minute -> Per-hour -> Per-day
- Downsampling: Older data kept at lower resolution
Key Takeaways
- A complete stream processing pipeline integrates Bloom Filter, HyperLogLog, Count-Min Sketch, and optionally FAISS
- Total memory for all four tools is approximately 1.2 MB
- Processing throughput exceeds 1M events/second/core
- Sketches are mergeable for distributed deployment
- These four algorithms form the foundation of modern big data infrastructure at Google, Meta, Twitter, and Netflix