processQueue() 完整依賴關係圖

Agent: architect (architect)
Confidence: 73%
Cost: $0.6002
Duration: 2m 35s
Task ID: c575ce2f-2c4b-4323-ba7c-042ca4342a1e


我現在有足夠的資訊來產出完整的依賴關係圖了。

★ Insight ─────────────────────────────────────
processQueue 的架構特徵:

  1. Fire-and-forget dispatchexecuteTask() 透過 .then() 非同步執行,不阻塞主迴圈。完成後自遞迴呼叫 processQueue() 繼續排程。
  2. Reentrant guard — 用 isProcessing flag + reprocessNeeded flag 解決「pipeline 子任務在執行中 enqueue 時觸發重入」的問題。這是 cooperative scheduling 的典型 pattern。
  3. 雙層預算控制 — 先用 budgetReservations Map 做樂觀預留(防並行 dispatch race),再在 executeTask 裡做精確的 isOverDailyLimit 檢查。
    ─────────────────────────────────────────────────

processQueue() 完整依賴關係圖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
processQueue()                                     [副作用: 修改 isProcessing/reprocessNeeded 狀態]
│ 入口: L960-1116 | 參數: 無 | 回傳: Promise<void>
│ Guard: if (isProcessing) return; — 防止重入

├── [Step 0] import('../safety/kill-switch.js') [動態 import]
│ ├── isEmergency() ← kill-switch.ts:97 [純函數: 讀記憶體狀態]
│ │ └── 讀取模組級 currentLevel 變數
│ └── isRestricted() ← kill-switch.ts:92 [純函數: 讀記憶體狀態]
│ └── 讀取模組級 currentLevel 變數

├── [Step 1] checkScheduledAgents() [副作用: enqueue tasks, 修改 lastScheduledCheck Map]
│ │ L776-853 | 參數: 無 | 回傳: Promise<void>
│ │
│ ├── loadAllAgentConfigs() ← agent-config.ts:156
│ │ └── readdir() + readFile() [副作用: 讀取 soul/agents/*.json]
│ │
│ ├── getTodayString() ← timezone.ts [純函數: Date → "YYYY-MM-DD"]
│ │
│ ├── lastScheduledCheck.delete/set() [副作用: 修改 Map(記憶體清理)]
│ │
│ ├── import('./budget-optimizer.js') [動態 import, 每日一次]
│ │ └── optimizeBudgets() ← budget-optimizer.ts:41
│ │ ├── loadAllAgentConfigs() [讀 soul/agents/*.json]
│ │ └── writer.writeNow() [副作用: 寫 soul/agents/*.json]
│ │
│ ├── import('./shared-knowledge.js') [動態 import, 每日一次]
│ │ └── compactKnowledge() ← shared-knowledge.ts:227
│ │ └── readFile() + writer.writeNow() [副作用: 讀寫 soul/agent-tasks/shared-knowledge.jsonl]
│ │
│ ├── isOverDailyLimit(name) ← agent-config.ts:237
│ │ ├── loadAgentConfig(name) [讀 soul/agents/{name}.json]
│ │ └── getTodayString() [純函數]
│ │
│ ├── parseScheduleInterval(schedule) ← agent-config.ts:249
│ │ └── [純函數: "every:5m" → 300000ms]
│ │
│ ├── isDailyScheduleDue(schedule, now) ← agent-config.ts:267
│ │ └── [純函數: "daily@08:00" + Date → boolean]
│ │
│ └── enqueueScheduledTask(agent) [副作用: 寫 queue.json]
│ │ L856-886
│ │
│ ├── getTodayString() ← timezone.ts [純函數]
│ ├── toLocalDateString() ← timezone.ts [純函數]
│ ├── loadAgentConfig(dep) ← agent-config.ts [讀 soul/agents/{dep}.json — 檢查 dependsOnAgents]
│ │
│ ├── [特殊路徑] enqueueCommentMonitorTask(agent) [副作用: 網路 I/O + enqueue]
│ │ │ L892-952 (僅 agent.name === 'comment-monitor' 時)
│ │ ├── import('../blog/comment-client.js') [動態 import]
│ │ │ └── getLatestComments() ← comment-client.ts:76
│ │ │ └── [副作用: HTTP 呼叫外部留言 API]
│ │ ├── recordAgentRun(name, 0) ← agent-config.ts [副作用: 更新 agent JSON]
│ │ └── enqueueTask(name, prompt, 6) → [遞迴回到 public API]
│ │
│ └── enqueueTask(name, prompt, 5, {source:'scheduled'})
│ │ L207-257 | 回傳: Promise<string> (taskId)
│ ├── loadQueue() [讀 queue.json]
│ ├── saveQueue() [副作用: 寫 queue.json]
│ ├── randomUUID() ← node:crypto [純函數]
│ └── processQueue() ⚡ 遞迴! [if !isProcessing → 直接呼叫]
│ └── (被 isProcessing guard 擋住,改設 reprocessNeeded = true)

├── [Step 2] loadQueue() [副作用: 讀檔案]
│ │ L188-196 | 回傳: Promise<TaskQueue>
│ └── readFile(QUEUE_PATH) ← node:fs/promises [讀 soul/agent-tasks/queue.json]

├── [Step 3] 清理 stale running tasks (inline loop)
│ ├── isBusy(workerId) ← claude-code.ts:587 [純函數: 讀記憶體 Map]
│ ├── requeueForRetry(task) [副作用: 修改 task 物件]
│ │ ├── getRetryDelay(retryCount) L80 [純函數: 指數退避計算]
│ │ └── addTrace(task, phase, detail) L338 [副作用: push to task.trace[]]
│ └── addTrace() [副作用: push to task.trace[]]

├── [Step 4] 篩選 + 排序 pending tasks
│ ├── getRecentCompletedIds() [副作用: 讀檔案 + 快取]
│ │ │ L350-372 | 回傳: Promise<Set<string>>
│ │ └── readFile(HISTORY_PATH) [讀 soul/agent-tasks/history.jsonl]
│ │ (有 60 秒 TTL 快取: recentCompletedIds)
│ │
│ ├── checkDependencies(task, allTasks, completedIds) [純函數(幾乎): 讀 task 陣列]
│ │ │ L378-400 | 回傳: { satisfied, blockedBy, failedDep }
│ │ └── (唯一副作用: 外層迴圈中修改 task.status/error — 但那是呼叫方 inline 做的)
│ │
│ └── addTrace() [副作用: push to task.trace[]]

├── [Step 5] Dispatch loop (for each pendingTask)
│ │
│ ├── findFreeWorker(reservedWorkers) [準純函數: 讀記憶體]
│ │ │ L299-314 | 回傳: number | null
│ │ ├── isBusy(wid) ← claude-code.ts [讀 busyLocks Map]
│ │ └── exclude.has(wid) [讀 Set]
│ │
│ ├── reserveBudget(agentName, estimatedCost) [副作用: 修改 budgetReservations Map]
│ │ │ L160-173 | 回傳: Promise<boolean>
│ │ ├── isOverDailyLimit(name) ← agent-config.ts [讀 agent JSON]
│ │ ├── loadAgentConfig(name) ← agent-config.ts [讀 agent JSON]
│ │ ├── getTodayString() ← timezone.ts [純函數]
│ │ └── budgetReservations.set() [副作用: Map 寫入]
│ │
│ ├── addTrace() [副作用: push to task.trace[]]
│ │
│ └── executeTask(task, workerId) .then(...) [🔥 Fire-and-forget async]
│ │ L405-610 | 回傳: Promise<void>
│ │ ⚠️ 這是最重的函數 — 呼叫外部 CLI 進程
│ │
│ ├── loadAgentConfig(name) ← agent-config.ts [讀 agent JSON]
│ ├── isOverDailyLimit(name) ← agent-config.ts [讀 agent JSON](二次檢查)
│ ├── buildWorkerSystemPrompt(cfg, task) [副作用: 動態 import + 讀檔]
│ │ │ L613-753 | 回傳: { prompt: string, metrics: PromptMetrics }
│ │ ├── getEffectivePermissions() ← agent-permissions.ts:50 [純函數]
│ │ ├── buildPermissionPrompt() ← agent-permissions.ts:134 [純函數]
│ │ └── import('./shared-knowledge.js')
│ │ └── queryKnowledge(name, prompt) ← shared-knowledge.ts:78
│ │ └── readFile() [讀 shared-knowledge.jsonl]
│ │
│ ├── askClaudeCode(prompt, workerId, opts) [🔥 副作用: 啟動 CLI 子進程]
│ │ └── ← claude-code.ts:228 [spawn child process, I/O 密集]
│ │
│ ├── [成功路徑]
│ │ ├── import('./result-assessor.js')
│ │ │ └── assessResult() [副作用: 可能呼叫 LLM Judge API]
│ │ ├── assessHeuristic() ← result-assessor.ts:117 [純函數: 文字分析]
│ │ ├── writeReport(report) [副作用: 寫 JSONL 報告]
│ │ │ └── writer.appendJsonl() [寫 soul/agent-reports/{agent}/{date}.jsonl]
│ │ ├── import('./shared-knowledge.js')
│ │ │ └── depositKnowledge() [副作用: 寫 shared-knowledge.jsonl]
│ │ ├── recordAgentRun(name, cost) ← agent-config.ts
│ │ │ └── writer.writeNow() [副作用: 更新 soul/agents/{name}.json]
│ │ └── eventBus.emit('agent:task:completed') [副作用: 觸發事件]
│ │
│ ├── [失敗路徑]
│ │ ├── recordAgentFailure(name, error) ← agent-config.ts
│ │ │ └── writer.writeNow() [副作用: 更新 soul/agents/{name}.json]
│ │ └── eventBus.emit('agent:task:failed') [副作用: 觸發事件]
│ │
│ └── [.then() 回調 — L1069-1093]
│ ├── releaseBudget(name, cost) [副作用: 修改 budgetReservations Map]
│ ├── isTransientError(error) L317 [純函數: 字串比對]
│ ├── requeueForRetry(task) [副作用: 修改 task 物件]
│ ├── archiveTask(task) [副作用: 寫 history.jsonl + 更新 queue.json]
│ │ ├── writer.appendJsonl(HISTORY_PATH) [寫 soul/agent-tasks/history.jsonl]
│ │ ├── loadQueue() [讀 queue.json]
│ │ └── saveQueue() [寫 queue.json]
│ └── processQueue() ⚡ 遞迴! [完成後重觸發排程]

├── [Step 6] saveQueue(queue) [副作用: 寫檔案]
│ │ L198-200
│ └── writer.writeNow(QUEUE_PATH, queue) [寫 soul/agent-tasks/queue.json]

└── [finally] isProcessing = false
└── if (reprocessNeeded) → processQueue() ⚡ 遞迴!

跨模組依賴總覽

模組 Import 方式 被呼叫的函數 性質
agent-config.ts 靜態 import loadAgentConfig, loadAllAgentConfigs, recordAgentRun, recordAgentFailure, isOverDailyLimit, parseScheduleInterval, isDailyScheduleDue 讀寫 soul/agents/*.json
claude-code.ts 靜態 import askClaudeCode, isBusy 🔥 啟動 CLI 子進程
agent-permissions.ts 靜態 import getEffectivePermissions, buildPermissionPrompt 純函數
result-assessor.ts 靜態 + 動態 assessHeuristic, assessResult 純函數 / LLM 呼叫
shared-knowledge.ts 動態 import depositKnowledge, queryKnowledge, compactKnowledge 讀寫 shared-knowledge.jsonl
budget-optimizer.ts 動態 import optimizeBudgets 讀寫 agent JSON
kill-switch.ts 動態 import isEmergency, isRestricted 純函數
comment-client.ts 動態 import getLatestComments 🌐 HTTP 外部 API
pipeline-engine.ts 動態 import(非 processQueue 內) rehydratePipelines, registerPipelineListener 事件訂閱
core/timezone.ts 靜態 import getTodayString, toLocalDateString 純函數
core/event-bus.ts 靜態 import eventBus.emit 事件發射
core/debounced-writer.ts 靜態 import writer.writeNow, writer.appendJsonl 原子寫檔
core/logger.ts 靜態 import logger.info/warn/error/debug 日誌

發現與觀察

1. 遞迴呼叫鏈(重要性 4/5)

processQueue三個遞迴觸發點

  • enqueueTask() 內部(L253)— 被 isProcessing guard 安全攔截
  • executeTask().then() 回調(L1089)— 任務完成後重觸發
  • finally 中的 reprocessNeeded 檢查(L1113)

這形成了一個 自驅動循環:心跳觸發 → processQueue → dispatch → task 完成 → processQueue → 直到無任務為止。設計是安全的(有 isProcessing guard),但值得注意的是 executeTask().then() 裡的 processQueue() 呼叫不受 reprocessNeeded 機制管控——它直接呼叫,如果此時 isProcessing 為 true(另一個 .then() 回調同時觸發),會直接 return。

2. 雙重預算檢查(重要性 2/5)

reserveBudget() 在 dispatch 前做一次預留,executeTask() 內部又做一次 isOverDailyLimit() 精確檢查。這是合理的防禦設計(optimistic lock + pessimistic check),但造成 agent-config.ts 被同一任務讀取 3 次(reserve + execute 內的 config load + cost check)。

3. 無循環依賴(重要性 5/5)

所有依賴都是單向的worker-scheduler.ts → 其他模組。沒有發現其他模組 import worker-scheduler.ts 中的 processQueue(除了自身遞迴)。模組間透過 eventBus 解耦——pipeline-engine 監聽 agent:task:completed 事件來推進 pipeline 階段,而不是直接 import。

4. 檔案 I/O 密集度(重要性 3/5)

一次 processQueue 迴圈的磁碟 I/O:

  • : agents/*.json(多次)、queue.json、history.jsonl、shared-knowledge.jsonl
  • : queue.json(至少 2 次:dispatch 後 + archive 後)、agent JSON、history.jsonl、report JSONL

loadAgentConfig 沒有快取,每次呼叫都從磁碟讀取。如果排程的 agent 很多,checkScheduledAgents 內的 isOverDailyLimit 會為每個 agent 各讀一次 JSON 檔。

5. 動態 import 策略(重要性 2/5)

kill-switch.tsbudget-optimizer.tsshared-knowledge.tscomment-client.tsresult-assessor.ts 全部用 await import() 動態載入。這是刻意的——避免啟動時的循環依賴(尤其是 MCP server 和 pipeline-engine 可能反向引用 worker-scheduler),也減少冷啟動時間。


報告已完成。如需進一步深入任何分支(例如 askClaudeCode 的內部實作或 eventBus.emit 的下游消費者),請指示。