DynamoDB 與 EventBridge
🔥 Vibe Prompt
「設計 DynamoDB 單表設計模式。使用 EventBridge 進行跨服務事件通訊。」
DynamoDB 單表設計
# 單表設計:多個實體共用一個表
# PK: ENTITY#ID, SK: ENTITY#METADATA
table.put_item(Item={
'PK': 'USER#user123',
'SK': 'PROFILE',
'name': 'Alice',
'email': 'alice@example.com'
})
table.put_item(Item={
'PK': 'USER#user123',
'SK': 'ORDER#ord456',
'order_id': 'ord456',
'total': 99.99,
'status': 'pending'
})
# 查詢使用者的所有訂單
response = table.query(
KeyConditionExpression='PK = :pk AND begins_with(SK, :sk)',
ExpressionAttributeValues={
':pk': 'USER#user123',
':sk': 'ORDER#'
}
)
存取模式
| 模式 | 查詢方式 | |------|---------| | 依 ID 取得使用者 | PK=USER#id, SK=PROFILE | | 取得使用者訂單 | PK=USER#id, SK begins_with ORDER# | | 取得所有使用者 | GSI 依實體類型 | | 依狀態查詢訂單 | GSI 依狀態 |
EventBridge
resource "aws_eventbridge_rule" "order_created" {
name = "order-created"
event_pattern = jsonencode({
source = ["myapp.orders"]
detail-type = ["OrderCreated"]
})
}
resource "aws_eventbridge_target" "notification" {
rule = aws_eventbridge_rule.order_created.name
arn = aws_lambda_function.notification.arn
}
import boto3, json
eventbridge = boto3.client('events')
eventbridge.put_events(Entries=[{
'Source': 'myapp.orders',
'DetailType': 'OrderCreated',
'Detail': json.dumps({
'order_id': 'ord456',
'user_id': 'user123',
'total': 99.99
}),
'EventBusName': 'default'
}])
關鍵要點
- ✅ DynamoDB Streams 自動捕獲表格的每一次變更(Create/Update/Delete)
- ✅ Stream Records 保留 24 小時,可按 Shard 平行處理
- ✅ Lambda Trigger 自動批次處理 Stream Records(每批次最多 1000 筆)
- ✅ EventBridge 提供 Content Filtering,只有符合 Pattern 的事件才會觸發
- ✅ 事件驅動架構讓服務完全解耦,新增消費者不需要修改既有程式碼
Stream 與 Lambda 批次處理
import json, base64
def lambda_handler(event, context):
for record in event['Records']:
# DynamoDB Stream 事件格式
if record['eventName'] == 'INSERT':
new_image = record['dynamodb']['NewImage']
# 轉換 DynamoDB JSON → 一般 JSON
item = {k: list(v.values())[0] for k, v in new_image.items()}
print(f"新增: {item}")
elif record['eventName'] == 'MODIFY':
print(f"修改: {record['dynamodb']['Keys']}")
elif record['eventName'] == 'REMOVE':
print(f"刪除: {record['dynamodb']['Keys']}")
return {'statusCode': 200}
事件驅動架構的威力:以訂單流程為例
想像一個電商平台:使用者下單後,系統需要:
- 驗證庫存 → 扣庫存 → 建立訂單
- 送出確認 Email
- 通知出貨部門
- 更新推薦系統
- 記錄分析事件
在傳統架構中,這些邏輯全部寫在同一個服務裡——修改一個環節就可能影響全部。用 EventBridge,你的程式碼變成:
# Service A:訂單服務,只負責建立訂單
import boto3, json
eventbridge = boto3.client('events')
def create_order(order_data):
order = save_to_dynamodb(order_data)
# 發送一個事件,其他服務自行訂閱
eventbridge.put_events(Entries=[{
'Source': 'com.example.order',
'DetailType': 'OrderCreated',
'Detail': json.dumps({'order_id': order['id'], 'user_id': order['user_id']}),
'EventBusName': 'default'
}])
return order
# Service B:Email 服務(完全獨立部署)
def handle_order_created(event, context):
# 只處理 OrderCreated 事件
detail = json.loads(event['detail'])
send_email(detail['user_id'], f"您的訂單 {detail['order_id']} 已成立!")
三個關鍵好處:
- 鬆散耦合:Email 服務當機不影響訂單服務
- 容易新增功能:想加入 Line 通知?多一個 Lambda 訂閱同一事件就好
- 非同步處理:使用者不需要等 Email 寄完才看到成功頁面
為什麼要學 EventBridge?
EventBridge 不僅僅是「發送事件」的工具。它的 Content Filtering 可以只觸發符合特定條件的事件,它的 Scheduler 可以排程定期任務,它的 Pipes 可以直接串接多個 AWS 服務不用寫程式。學會 EventBridge 等於學會了在 AWS 上做事件驅動設計的標準方式。
如何應用在真實專案?
常見的 EventBridge 應用場景:
📸 圖片上傳 → S3 事件 → EventBridge → Lambda 產生縮圖 → S3
📊 使用者註冊 → API Gateway → Lambda → EventBridge → 分析管道
⏰ 每日報表 → EventBridge Scheduler → Lambda 聚合資料 → Email 寄送
🛒 棄單召回 → 購物車過期 → EventBridge → Lambda → Line 通知
接下來學什麼
事件驅動架構解決了「如何非同步溝通」的問題。但當你的流程不只是「觸發→回應」這麼簡單,而是需要多個步驟加上條件判斷、重試邏輯、人工審核時,就需要 Step Functions 來編排這些複雜工作流程了。下一章將教你如何用 JSON 定義一個可視化的訂單處理流程,包含自動重試、錯誤處理、條件分支。"