Skip to content

消除 SSE Pipeline 中的 MongoDB Polling:從 Redis 到 Change Streams 的完整討論

問題背景

系統架構如下:API-A 接收外部請求,以 async 方式呼叫外部服務,外部服務透過 SSE 回傳串流資料,每個 chunk 寫入 MongoDB。API-B 負責讓客戶端取得該請求的最新結果,目前以固定間隔 polling MongoDB 來偵測變化。

外部 API ──SSE──► API-A ──append──► MongoDB doc { chunks: ["a", "b", ...] }
                                                         ▲
                                         API-B 輪詢 ───┘  (每 N 秒)

這個做法的問題:

  • MongoDB 讀取壓力隨 polling 頻率 × 客戶端數量線性增長
  • 延遲受限於 polling 間隔,而非資料實際抵達時間
  • 沒有新資料時仍持續發生無效讀取

方案一覽

Redis 讀取快取Redis Pub/SubRedis StreamsMongoDB Change Streams直接推送(WebSocket)合併服務
消除 polling
消費者離線後能存活是(回退 Mongo)是(ID 重播)是(resume token)N/A
需要新元件RedisRedisRedis
實作複雜度

方案一:Redis 作為讀取快取

最簡單的改法。API-A 寫入 Mongo 的同時把最新狀態存進 Redis,API-B 改從 Redis 讀取。

# API-A:寫入 Mongo,同步更新 Redis 快取
async def append_chunk(doc_id: str, chunk: str):
    await mongo.collection.update_one(
        {"_id": doc_id},
        {"$push": {"chunks": chunk}}
    )
    await redis.rpush(f"chunks:{doc_id}", chunk)
    await redis.expire(f"chunks:{doc_id}", 3600)

# API-B:優先讀取 Redis,cache miss 才回退 Mongo
async def get_chunks(doc_id: str) -> list[str]:
    cached = await redis.lrange(f"chunks:{doc_id}", 0, -1)
    if cached:
        return [c.decode() for c in cached]
    doc = await mongo.collection.find_one({"_id": doc_id})
    return doc["chunks"] if doc else []

這個方案到底換到了什麼?

本質上是「降低每次 poll 的成本」,而不是「減少或消除 poll」。Redis 是純記憶體 key-value 查找,延遲在亞毫秒級;Mongo 要經過 BSON 解碼、可能觸發磁碟 I/O、走查詢引擎。

有意義的情境:

  • 同一筆 request 有多個客戶端同時在等結果(前端重複請求、多個下游服務)
  • Mongo 本身已經是瓶頸(寫入量大、其他業務共用同一個 cluster)
  • polling 間隔很短(如 200ms),Redis 承受這種頻率毫無壓力

不值得的情境:

  • 一筆 request 通常只有一個客戶端在等,Mongo 讀取壓力本來就不高
  • 核心痛點是延遲而非負載——polling 間隔 500ms 就是 500ms,不管 poll 的是 Redis 還是 Mongo
  • 核心痛點是無效讀取的浪費——poll Redis 和 poll Mongo 一樣是空跑,只是空跑成本比較低

結論:這是 operational 層面的優化,不是架構改變。Mongo 是瓶頸時有效,但無法消除 polling 本身。


方案二:Redis Pub/Sub

API-A 在資料抵達時主動通知 API-B,推送取代輪詢。

# API-A:每個 chunk 到達時發布到 Redis channel
async def handle_sse_chunk(doc_id: str, chunk: str):
    await mongo.collection.update_one(
        {"_id": doc_id},
        {"$push": {"chunks": chunk}}
    )
    await redis.publish(f"stream:{doc_id}", chunk)

# API-B:訂閱並反應,不需要輪詢迴圈
async def subscribe_to_chunks(doc_id: str):
    pubsub = redis.pubsub()
    await pubsub.subscribe(f"stream:{doc_id}")
    async for message in pubsub.listen():
        if message["type"] == "message":
            chunk = message["data"].decode()
            yield chunk

優點: 完全消除 polling,資料抵達瞬間即反應。

代價: Pub/Sub 是 fire-and-forget。API-B 當機或斷線期間的訊息直接丟失,重連後必須從 Mongo 重建狀態。


方案三:Redis Streams

Append-only 日誌,每筆記錄有自動 ID,消費者可從任意位置讀取。

# API-A:同時 append 到 Mongo 與 Redis Stream
async def handle_sse_chunk(doc_id: str, chunk: str):
    await mongo.collection.update_one(
        {"_id": doc_id},
        {"$push": {"chunks": chunk}}
    )
    await redis.xadd(f"stream:{doc_id}", {"chunk": chunk}, maxlen=1000)

# API-B:阻塞式讀取,不輪詢、不掉訊息
async def stream_chunks(doc_id: str, last_id: str = "$"):
    while True:
        results = await redis.xread(
            {f"stream:{doc_id}": last_id},
            block=5000,
            count=10
        )
        if results:
            for stream_name, messages in results:
                for msg_id, fields in messages:
                    last_id = msg_id
                    yield fields[b"chunk"].decode()

接上 FastAPI SSE endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import redis.asyncio as aioredis

app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")

@app.get("/stream/{doc_id}")
async def sse_endpoint(doc_id: str, last_id: str = "$"):
    async def event_generator():
        async for chunk in stream_chunks(doc_id, last_id):
            yield f"data: {chunk}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

優點: 無 polling、重連不掉訊息(透過 ID 重播)、XREAD BLOCK 讓消費者零 CPU 等待。

代價: Stream 記錄會累積,記得設 maxlen 修剪舊記錄。

組織限制:內部 Redis 團隊只支援 cache

如果內部 Redis 團隊宣告「只管 cache,不支援 queue/messaging」,這是運維邊界問題,不是技術限制。他們的叢集是針對 cache 的使用模式配置和監控的(可能開了 maxmemory-policy allkeys-lru、沒有持久化)。

可行的路:

  1. 自己起一個獨立的 Redis 實例專做 messaging,跟 cache 叢集完全隔離
  2. 跳過 Redis,改用 MongoDB Change Streams(見下一節)
  3. 跟 Redis 團隊談,請他們提供 messaging 等級的服務

方案四:MongoDB Change Streams(不引入新元件)

如果不想或不能引入 Redis 做 messaging,可以利用 MongoDB 本身的推送能力。Change Streams 底層基於 oplog tailing,本質上是推送機制,完全消除 polling 且不需要任何新元件。

pipeline = [
    {
        "$match": {
            "operationType": "update",
            "documentKey._id": doc_id
        }
    }
]

async with collection.watch(pipeline) as stream:
    async for change in stream:
        updated = change["updateDescription"]["updatedFields"]
        chunk_updates = {
            k: v for k, v in updated.items()
            if k.startswith("chunks")
        }
        if chunk_updates:
            yield chunk_updates

前提條件

  • MongoDB 版本 ≥ 3.6(2017 年底發布)
    • 3.6:只能 watch 單一 collection
    • 4.0:可 watch 整個 database 或 cluster
    • 4.2:pipeline 可用更多 aggregation stage
  • 必須是 Replica Set(standalone 沒有 oplog,無法使用 Change Streams)

快速確認是否支援

# 方法一:檢查是否為 replica set
mongosh --eval "rs.status()"
# 回傳有 "set" 名稱和成員列表 → 支援
# 報錯 NoReplicationEnabled → 不支援
# 方法二:在應用層直接試
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def check():
    client = AsyncIOMotorClient("mongodb://your-host:27017")
    try:
        async with client.your_db.your_collection.watch() as stream:
            print("Change Streams 可用")
    except Exception as e:
        print(f"不支援: {e}")

asyncio.run(check())

如果使用 MongoDB Atlas(雲服務),不管哪個 tier 都是 replica set,一定支援。

什麼是 oplog?

oplog(operation log)是 MongoDB 內部的寫入日誌。每當發生 insert、update、delete,都會記錄成一筆 oplog 條目。原始目的是讓 replica set 的 secondary 節點複製 primary 的寫入以保持資料一致。

時間 →
[insert doc A] → [update doc B] → [delete doc C] → [update doc A] → ...

Change Streams 就是搭在這條時間軸上的:讓應用程式也能像 secondary 節點一樣追蹤這條日誌,只是透過過濾和包裝,讓你只看到關心的變化。

oplog 是 capped collection,空間滿了舊紀錄會被覆蓋。這就是為什麼應用離線太久時 resume token 可能失效。

注意事項

1. Resume Token 一定要存

每個事件都帶一個 resume token,斷線重連時靠它從斷點續讀。沒存的話重連後只能拿到「從現在起」的新事件,中間的就漏了。

2. oplog 有大小限制和時間窗口

oplog 是固定大小,舊紀錄會被覆蓋。離線超出保留窗口,resume token 就會失效。Atlas 上通常是 72 小時,自建的需自行確認。Change Streams 不適合當長期事件重播機制。

3. 高頻寫入要注意消費速度

每筆寫入都會產生 oplog 條目再被推給你。pipeline 裡加 $match 過濾條件可以大幅減少不必要的事件。

4. 斷線是常態,必須有重連邏輯

網路抖動、primary 切換(failover)都會導致斷線。

async def watch_with_retry(collection, doc_id):
    resume_token = None
    while True:
        try:
            pipeline = [{"$match": {"documentKey._id": doc_id}}]
            async with collection.watch(
                pipeline,
                resume_after=resume_token
            ) as stream:
                async for change in stream:
                    resume_token = stream.resume_token
                    yield change
        except Exception:
            await asyncio.sleep(1)  # 等一下再重連

5. 監聽特定欄位的陷阱

如果用 $push 往陣列加元素,updatedFields 回傳的 key 會是 chunks.3 而不是 chunks,直接用 $exists 匹配 updatedFields.chunks 會匹配不到。

建議在 pipeline 過濾 documentKey._id + operationType,然後在應用層判斷欄位:

pipeline = [
    {
        "$match": {
            "operationType": "update",
            "documentKey._id": doc_id
        }
    }
]

async with collection.watch(pipeline) as stream:
    async for change in stream:
        updated = change["updateDescription"]["updatedFields"]
        chunk_updates = {
            k: v for k, v in updated.items()
            if k.startswith("chunks")
        }
        if chunk_updates:
            # 處理新 chunk
            pass

方案五:其他不需要外部元件的做法

直接進程間通訊

如果 API-A 和 API-B 在同一進程或同一台機器,可用 asyncio Queue、callback、Unix domain socket。完全不需要外部系統。

WebSocket 直連

API-A 收到 SSE chunk 的同時,直接透過 WebSocket 推送給所有已連線的消費者。適合單實例或少量實例。

connected_clients: dict[str, list[WebSocket]] = {}

async def handle_sse_chunk(doc_id: str, chunk: str):
    await mongo.collection.update_one(...)
    for ws in connected_clients.get(doc_id, []):
        await ws.send_text(chunk)

合併服務

最激進的做法:如果 API-B 的唯一職責是把 Mongo 裡的資料轉發給終端消費者,讓 API-A 直接承擔這個職責,整個 A→中間層→B 的鏈路就消失了。


第一性原理思考框架

從根本來看,解決「B 需要即時知道 A 的新資料」這個問題,只有幾條路:

  1. 讓 A 直接告訴 B(WebSocket、進程間通訊、合併服務)
  2. 讓資料源自己通知(MongoDB Change Streams)
  3. 讓共享的第三方傳遞訊息(Redis Pub/Sub、Redis Streams、Kafka、NATS 等)
  4. 重新質疑 B 是否需要存在

先搞清楚部署拓撲、可靠性需求、消費者數量、以及組織限制(誰管什麼元件),答案自然會浮現。