From 4510e0903533a2a873948d77bc16547032b94e96 Mon Sep 17 00:00:00 2001 From: Ammar Date: Wed, 27 May 2026 11:09:13 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20fix:=20lock=20goal=20state=20to?= =?UTF-8?q?=20chat=20tail?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `unknown`_ --- src/browser/features/ChatInput/index.tsx | 21 +- src/cli/run.ts | 17 +- src/common/orpc/schemas/stream.ts | 5 + src/common/types/goal.ts | 7 +- src/common/types/message.ts | 3 + .../agentSession.goalAutoPause.test.ts | 42 +-- src/node/services/agentSession.ts | 44 ++- .../services/workspaceGoalService.test.ts | 125 +++++++- src/node/services/workspaceGoalService.ts | 292 ++++++++++++++++-- src/node/services/workspaceService.ts | 57 ++-- tests/ui/chat/goalSlashCommand.test.ts | 8 +- tests/ui/chat/sendModeDropdown.test.ts | 34 +- 12 files changed, 521 insertions(+), 134 deletions(-) diff --git a/src/browser/features/ChatInput/index.tsx b/src/browser/features/ChatInput/index.tsx index d1772bae2e..9ff027255d 100644 --- a/src/browser/features/ChatInput/index.tsx +++ b/src/browser/features/ChatInput/index.tsx @@ -993,7 +993,6 @@ const ChatInputInner: React.FC = (props) => { !policyBlocksCreateSend; const runningGoalActive = variant === "workspace" && isGoalRunning(workspaceGoal?.status ?? "paused"); - const shouldDefaultSteerGoal = runningGoalActive && !editingMessageForUi; // Send defaults to tool-end on click; advanced dispatch modes remain available via // right-click and touch long-press whenever there's a sendable workspace draft. @@ -2252,8 +2251,7 @@ const ChatInputInner: React.FC = (props) => { const commandHandled = modelOneShot ? false : await executeParsedCommand(parsed, input, { - goalInterventionPolicy: - overrides?.goalInterventionPolicy ?? (shouldDefaultSteerGoal ? "steer" : undefined), + goalInterventionPolicy: overrides?.goalInterventionPolicy, queueDispatchMode: overrides?.queueDispatchMode, }); if (commandHandled) { @@ -2444,8 +2442,7 @@ const ChatInputInner: React.FC = (props) => { rawThinkingOverride != null ? resolveThinkingInput(rawThinkingOverride, policyModel) : undefined; - const goalInterventionPolicy = - overrides?.goalInterventionPolicy ?? (shouldDefaultSteerGoal ? "steer" : undefined); + const goalInterventionPolicy = overrides?.goalInterventionPolicy; const sendOptions = { ...sendMessageOptions, @@ -3058,7 +3055,7 @@ const ChatInputInner: React.FC = (props) => { {runningGoalActive && !editingMessageForUi && ( <>
- Send and pause goal: right-click menu + Manual sends pause the current goal; use Resume to continue it. )} {SEND_DISPATCH_MODES.map((entry) => ( @@ -3094,18 +3091,6 @@ const ChatInputInner: React.FC = (props) => { ))} - {runningGoalActive && !editingMessageForUi && ( - - )} )} diff --git a/src/cli/run.ts b/src/cli/run.ts index e4371d0354..6cf8213b91 100644 --- a/src/cli/run.ts +++ b/src/cli/run.ts @@ -84,6 +84,7 @@ import { } from "@/common/utils/goals/budgetParser"; import { CLI_GOAL_STREAM_START_TIMEOUT_MS, + GOAL_CONTINUATION_KIND, GOAL_CONTINUATION_IDLE_CONSUMER_NAME, } from "@/constants/goals"; import type { GoalRecordV1 } from "@/common/types/goal"; @@ -960,7 +961,21 @@ async function main(): Promise { const sendAndAwait = async (msg: string, options: SendMessageOptions): Promise => { completionPromise = createCompletionPromise(); - const sendResult = await session.sendMessage(msg, options); + const sendResult = await session.sendMessage( + msg, + options, + hasGoal + ? { + // CLI goal runs suppress the desktop kickoff dispatcher and drive + // their own user turns, so mark them as the durable goal + // continuations that keep goal mode active. + synthetic: true, + agentInitiated: true, + goalKind: GOAL_CONTINUATION_KIND, + goalContinuation: true, + } + : undefined + ); if (!sendResult.success) { const errorValue = sendResult.error; let formattedError = "unknown error"; diff --git a/src/common/orpc/schemas/stream.ts b/src/common/orpc/schemas/stream.ts index adc7aa504a..868e41cb75 100644 --- a/src/common/orpc/schemas/stream.ts +++ b/src/common/orpc/schemas/stream.ts @@ -674,6 +674,11 @@ export const ExperimentsSchema = z.object({ imageGenerationTool: z.boolean().optional(), }); +/** + * `steer` is accepted for older clients, but the backend treats every manual + * user message as a pause because active goal mode is derived from the latest + * `goal_continuation` user turn. + */ export const GoalInterventionPolicySchema = z.enum(["steer", "pause"]); // SendMessage options diff --git a/src/common/types/goal.ts b/src/common/types/goal.ts index 0cdc1d51fc..fb425d982b 100644 --- a/src/common/types/goal.ts +++ b/src/common/types/goal.ts @@ -52,9 +52,10 @@ export type GoalLifecycle = "active" | "complete"; * agent is doing with it right now: * * - `running` — the agent may auto-continue this goal. - * - `paused` — explicit user pause, explicit send-and-pause, or safety - * gates such as an interrupted stream. Continuations are - * suppressed until the user resumes. + * - `paused` — latest user turn is not a goal continuation (manual + * intervention, explicit pause), or safety gates such as + * an interrupted stream. Continuations are suppressed + * until the user resumes. * - `budget_limited` — internal-only transient state set by the budget * gate when cost or turn caps are hit. * diff --git a/src/common/types/message.ts b/src/common/types/message.ts index e551ad7607..02a214814d 100644 --- a/src/common/types/message.ts +++ b/src/common/types/message.ts @@ -358,6 +358,9 @@ export type MuxMessageMetadata = MuxMessageMetadataBase & | { type: "goal-cleared-summary"; } + | { + type: "goal-pause-boundary"; + } | { type: "heartbeat-request"; /** Synthetic heartbeat follow-ups use an explicit marker so future backend dispatch stays inspectable. */ diff --git a/src/node/services/agentSession.goalAutoPause.test.ts b/src/node/services/agentSession.goalAutoPause.test.ts index 3d50b9e9bb..2ee2f1c8b3 100644 --- a/src/node/services/agentSession.goalAutoPause.test.ts +++ b/src/node/services/agentSession.goalAutoPause.test.ts @@ -118,19 +118,19 @@ describe("AgentSession goal safety hooks", () => { } }); - test("manual user messages steer active goals by default", async () => { - const workspaceId = "manual-steers-active-goal"; + test("manual user messages pause active goals by default", async () => { + const workspaceId = "manual-pauses-active-goal-by-default"; const { session, goalService, analytics, cleanup } = await createSessionHarness(workspaceId); cleanups.push(cleanup); await setGoalOk(goalService, { workspaceId, objective: "Keep working" }); - const result = await session.sendMessage("I need to steer this goal", SEND_OPTIONS); + const result = await session.sendMessage("I need to pause this goal with a note", SEND_OPTIONS); expect(result.success).toBe(true); - expect(await goalService.getGoal(workspaceId)).toMatchObject({ status: "active" }); - expect(analytics.recordGoalLifecycleEvent).not.toHaveBeenCalledWith( + expect(await goalService.getGoal(workspaceId)).toMatchObject({ status: "paused" }); + expect(analytics.recordGoalLifecycleEvent).toHaveBeenCalledWith( "goal_paused", - expect.anything() + expect.objectContaining({ initiator: "auto" }) ); session.dispose(); }); @@ -216,7 +216,7 @@ describe("AgentSession goal safety hooks", () => { session.dispose(); }); - test("manual user messages clear acknowledgment flags while steering by default", async () => { + test("manual user messages clear acknowledgment flags while pausing by default", async () => { const workspaceId = "manual-clears-ack"; const { session, goalService, cleanup } = await createSessionHarness(workspaceId); cleanups.push(cleanup); @@ -227,7 +227,7 @@ describe("AgentSession goal safety hooks", () => { expect(result.success).toBe(true); expect(await goalService.getGoal(workspaceId)).toMatchObject({ - status: "active", + status: "paused", requireUserAcknowledgmentSinceMs: null, }); session.dispose(); @@ -436,7 +436,7 @@ describe("AgentSession goal safety hooks", () => { const manualResult = await session.sendMessage("I saw the recovered response", SEND_OPTIONS); expect(manualResult.success).toBe(true); expect(await goalService.getGoal(workspaceId)).toMatchObject({ - status: "active", + status: "paused", requireUserAcknowledgmentSinceMs: null, }); @@ -563,20 +563,19 @@ describe("AgentSession goal safety hooks", () => { return Promise.resolve(Ok(undefined)); }) as unknown as AIService["streamMessage"]; - // Manual user messages steer active goals by default (#3319), so the - // goal stays `active` rather than auto-pausing. The point of this test - // is that the silent-continuation auto-completion does NOT fire on - // manual turns — `activeStreamContext.goalKind` is undefined on a - // manual send, so the silent-completion gate - // (`goalKind === GOAL_CONTINUATION_KIND`) short-circuits and status - // stays `active`, not `complete`. + // Manual user messages now pause active goals because the goal mode is + // locked to the latest user message kind. The point of this test is that + // silent-continuation auto-completion still does NOT fire on manual turns: + // `activeStreamContext.goalKind` is undefined on a manual send, so the + // silent-completion gate (`goalKind === GOAL_CONTINUATION_KIND`) + // short-circuits and status stays `paused`, not `complete`. const result = await session.sendMessage("Manual question", SEND_OPTIONS); expect(result.success).toBe(true); // Give the async stream-end handler a tick to run so any stray - // auto-completion would have a chance to corrupt the steering state. + // auto-completion would have a chance to corrupt the paused state. await new Promise((resolve) => setTimeout(resolve, 50)); - expect(await goalService.getGoal(workspaceId)).toMatchObject({ status: "active" }); + expect(await goalService.getGoal(workspaceId)).toMatchObject({ status: "paused" }); session.dispose(); }); @@ -613,7 +612,7 @@ describe("AgentSession goal safety hooks", () => { session.dispose(); }); - test("text-only stream-end during a goal_continuation turn does not affect paused goals", async () => { + test("text-only goal_continuation turn can complete a resumed paused goal", async () => { const workspaceId = "silent-continuation-paused"; const { session, goalService, aiService, cleanup } = await createSessionHarness(workspaceId); cleanups.push(cleanup); @@ -641,7 +640,10 @@ describe("AgentSession goal safety hooks", () => { expect(result.success).toBe(true); await new Promise((resolve) => setTimeout(resolve, 50)); - expect(await goalService.getGoal(workspaceId)).toMatchObject({ status: "paused" }); + expect(await goalService.getGoal(workspaceId)).toMatchObject({ + status: "complete", + completionSummary: "All wrapped up.", + }); session.dispose(); }); }); diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 8e36ccfde0..d47a6b295f 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -1125,12 +1125,23 @@ export class AgentSession { ); } + private isSyntheticGoalPauseBoundaryMessage(message: MuxMessage): boolean { + return ( + message.role === "user" && + message.metadata?.synthetic === true && + message.metadata.muxMetadata?.type === "goal-pause-boundary" + ); + } + private getLastNonSystemHistoryMessage(historyTail: MuxMessage[]): MuxMessage | undefined { for (let index = historyTail.length - 1; index >= 0; index -= 1) { const candidate = historyTail[index]; if (candidate.role === "system") { continue; } + if (this.isSyntheticGoalPauseBoundaryMessage(candidate)) { + continue; + } if (this.isSyntheticSnapshotUserMessage(candidate)) { continue; } @@ -1184,13 +1195,15 @@ export class AgentSession { `invalid goal intervention policy: ${input.policy}` ); - // Accepted manual user turns acknowledge /clear or crash-recovery gates. Steering - // is the default for normal sends: the agent should see the user's intervention - // and the active goal may continue afterward. Rejected sends use "pause" below - // because the model never saw the steering content. + // Accepted manual user turns acknowledge / clear crash-recovery gates, but + // they are no longer goal-continuation turns. The goal mode is locked to the + // chat tail: a real `goal_continuation` user message means running; + // anything manually typed by the user pauses until Resume appends a fresh + // continuation. Legacy clients may still send the old "steer" policy; treat + // it as pause so the invariant holds at this backend boundary. goalService.clearPendingContinuationForManualUserMessage(this.workspaceId); const goal = await goalService.acknowledgeUser(this.workspaceId); - if (goal?.status !== "active" || input.policy !== "pause") { + if (goal?.status !== "active") { return; } @@ -1233,6 +1246,9 @@ export class AgentSession { return false; } + if (this.isSyntheticGoalPauseBoundaryMessage(message)) { + return false; + } if (this.isSyntheticSnapshotUserMessage(message)) { return false; } @@ -2172,6 +2188,7 @@ export class AgentSession { agentInitiated?: boolean; goalContinuation?: boolean; goalKind?: GoalSyntheticMessageKind; + startStreamInBackground?: boolean; onAcceptedPreStreamFailure?: (error: SendMessageError) => Promise | void; } ): Promise> { @@ -2238,7 +2255,7 @@ export class AgentSession { const editMessageId = options?.editMessageId; const manualGoalInterventionPolicy: GoalInterventionPolicy | undefined = isManualUserMessage - ? (options?.goalInterventionPolicy ?? (editMessageId ? "pause" : "steer")) + ? (options?.goalInterventionPolicy ?? "pause") : undefined; // Edits are implemented as truncate+replace. If the frontend omits fileParts, @@ -2667,6 +2684,8 @@ export class AgentSession { } } + await this.workspaceGoalService?.syncGoalModeWithChatTail(this.workspaceId); + if (manualGoalInterventionPolicy != null) { await this.applyManualUserMessageGoalSafety({ policy: manualGoalInterventionPolicy }); } @@ -2767,19 +2786,22 @@ export class AgentSession { } }; - if (editMessageId) { - // The edit is already persisted + emitted above, so let callers unblock immediately instead of - // waiting for runtime warmup / stream-start to finish before they can clear edit state. - void startPreparedStream() + if (editMessageId || internal?.startStreamInBackground === true) { + // The user turn is already persisted + emitted above. Edits and backend + // goal continuations should unblock once the user message exists: for + // Resume, that makes chat history the durable source of truth for the + // running goal before runtime warmup or streaming can race/fail. + startPreparedStream() .then(async (result) => { if (!result.success) { await internal?.onAcceptedPreStreamFailure?.(result.error); } }) .catch((error: unknown) => { - log.error("Accepted edit stream failed before startup completed", { + log.error("Accepted background stream failed before startup completed", { workspaceId: this.workspaceId, editMessageId, + goalKind, error: getErrorMessage(error), }); }); diff --git a/src/node/services/workspaceGoalService.test.ts b/src/node/services/workspaceGoalService.test.ts index 71269bb78b..f8b1f94ef7 100644 --- a/src/node/services/workspaceGoalService.test.ts +++ b/src/node/services/workspaceGoalService.test.ts @@ -8,7 +8,12 @@ import { IdleDispatcher } from "./idleDispatcher"; import { createTestHistoryService } from "./testHistoryService"; import type { HistoryService } from "./historyService"; import type { GoalRecordV1, GoalStatus } from "@/common/types/goal"; -import { GOAL_BUDGET_LIMIT_KIND, GOAL_CONTINUATION_IDLE_CONSUMER_NAME } from "@/constants/goals"; +import { + GOAL_BUDGET_LIMIT_KIND, + GOAL_CONTINUATION_IDLE_CONSUMER_NAME, + GOAL_CONTINUATION_KIND, +} from "@/constants/goals"; +import { createMuxMessage } from "@/common/types/message"; // Shared dispatch helpers live in `./testDispatchHelpers` instead of local // copies so future callers cannot drift. import { drainPendingDispatches, waitForCondition } from "./testDispatchHelpers"; @@ -33,6 +38,28 @@ async function setGoalOk( return result.data; } +async function appendUserHistoryMessage( + historyService: HistoryService, + workspaceId: string, + text: string, + metadata: Parameters[3] = { timestamp: Date.now() } +): Promise { + const result = await historyService.appendToHistory( + workspaceId, + createMuxMessage(`goal-test-user-${crypto.randomUUID()}`, "user", text, metadata) + ); + expect(result.success).toBe(true); +} + +async function getLastUserHistoryMessage(historyService: HistoryService, workspaceId: string) { + const history = await historyService.getLastMessages(workspaceId, 20); + expect(history.success).toBe(true); + if (!history.success) { + throw new Error(history.error); + } + return [...history.data].reverse().find((message) => message.role === "user"); +} + const PROJECT_PATH = "/tmp/mux-goal-service-test-project"; async function goalFileExists(config: Config, workspaceId: string): Promise { @@ -359,6 +386,101 @@ describe("WorkspaceGoalService", () => { expect(executed[0]?.message).toContain(""); }); + test("getGoal reconciles active goals to paused when the latest user turn is not a continuation", async () => { + await setGoalOk(service, { workspaceId, objective: "Follow chat tail" }); + await appendUserHistoryMessage(historyService, workspaceId, "Manual interruption"); + + const reconciled = await service.getGoal(workspaceId); + + expect(reconciled).toMatchObject({ status: "paused" }); + }); + + test("chat-tail reconciliation ignores synthetic maintenance user rows", async () => { + await setGoalOk(service, { workspaceId, objective: "Ignore maintenance rows" }); + await appendUserHistoryMessage(historyService, workspaceId, "Continue goal", { + timestamp: Date.now(), + synthetic: true, + uiVisible: true, + kind: GOAL_CONTINUATION_KIND, + }); + await appendUserHistoryMessage(historyService, workspaceId, "Synthetic heartbeat", { + timestamp: Date.now(), + synthetic: true, + muxMetadata: { type: "heartbeat-request", source: "heartbeat" }, + }); + + const reconciled = await service.getGoal(workspaceId); + + expect(reconciled).toMatchObject({ status: "active" }); + }); + + test("pause appends a hidden user boundary so the chat tail no longer marks the goal active", async () => { + await setGoalOk(service, { workspaceId, objective: "Pause from continuation" }); + await appendUserHistoryMessage(historyService, workspaceId, "Continue goal", { + timestamp: Date.now(), + synthetic: true, + uiVisible: true, + kind: GOAL_CONTINUATION_KIND, + }); + + const paused = await setGoalOk(service, { workspaceId, status: "paused" }); + const lastUserMessage = await getLastUserHistoryMessage(historyService, workspaceId); + + expect(paused).toMatchObject({ status: "paused" }); + expect(lastUserMessage?.metadata?.synthetic).toBe(true); + expect(lastUserMessage?.metadata?.muxMetadata).toMatchObject({ type: "goal-pause-boundary" }); + expect(lastUserMessage?.metadata?.kind).toBeUndefined(); + expect(await service.getGoal(workspaceId)).toMatchObject({ status: "paused" }); + }); + + test("resume appends a goal continuation before reporting the goal active", async () => { + await setGoalOk(service, { workspaceId, objective: "Resume via chat tail", status: "paused" }); + await appendUserHistoryMessage(historyService, workspaceId, "Manual pause reason"); + const dispatcher = new IdleDispatcher(); + service.registerGoalContinuationConsumer(dispatcher, { + hasActiveDescendantTasks: () => false, + getRuntimeState: () => ({ isRuntimeCompatible: true }), + executeGoalContinuation: async (input) => { + await appendUserHistoryMessage(historyService, input.workspaceId, input.message, { + timestamp: Date.now(), + synthetic: true, + uiVisible: true, + kind: input.kind ?? GOAL_CONTINUATION_KIND, + }); + return true; + }, + getKickoffSendOptions: () => ({ model: "openai:gpt-4o", agentId: "exec" }), + }); + + const resumed = await setGoalOk(service, { workspaceId, status: "active" }); + const lastUserMessage = await getLastUserHistoryMessage(historyService, workspaceId); + + expect(resumed).toMatchObject({ status: "active" }); + expect(lastUserMessage?.metadata?.kind).toBe(GOAL_CONTINUATION_KIND); + }); + + test("pause clears a deferred kickoff continuation candidate", async () => { + await setGoalOk(service, { workspaceId, objective: "Deferred resume", status: "paused" }); + let busy = true; + const dispatcher = new IdleDispatcher(); + const execute = mock(() => Promise.resolve(true)); + service.registerGoalContinuationConsumer(dispatcher, { + ...continuationBridge(execute), + getRuntimeState: () => ({ isRuntimeCompatible: true, isBusy: busy }), + getKickoffSendOptions: () => ({ model: "openai:gpt-4o", agentId: "exec" }), + }); + + await setGoalOk(service, { workspaceId, status: "active" }); + await drainPendingDispatches(); + expect(execute).not.toHaveBeenCalled(); + + await setGoalOk(service, { workspaceId, status: "paused" }); + busy = false; + await dispatcher.requestDispatch(workspaceId, GOAL_CONTINUATION_IDLE_CONSUMER_NAME); + + expect(execute).not.toHaveBeenCalled(); + }); + test("skips the kickoff arm when no kickoff send options are available", async () => { const dispatcher = new IdleDispatcher(); const executed: Array<{ message: string }> = []; @@ -467,7 +589,6 @@ describe("WorkspaceGoalService", () => { sendOptions: { model: "openai:gpt-4o", agentId: "exec" }, streamEndedAtMs: 10_001, }); - await dispatcher.requestDispatch(workspaceId, GOAL_CONTINUATION_IDLE_CONSUMER_NAME); expect(execute).toHaveBeenCalledTimes(2); }); diff --git a/src/node/services/workspaceGoalService.ts b/src/node/services/workspaceGoalService.ts index 88438a9008..e9cc7c102e 100644 --- a/src/node/services/workspaceGoalService.ts +++ b/src/node/services/workspaceGoalService.ts @@ -22,7 +22,11 @@ import { GoalRecordV1Schema, } from "@/common/orpc/schemas/goal"; import type { GoalBoardEntry, GoalBoardSnapshot, GoalBoardV1 } from "@/common/types/goal"; -import { createMuxMessage, pickStartupRetrySendOptions } from "@/common/types/message"; +import { + createMuxMessage, + pickStartupRetrySendOptions, + type MuxMessage, +} from "@/common/types/message"; import type { ProvidersConfigMap, SendMessageOptions } from "@/common/orpc/types"; import { isWorkspaceArchived } from "@/common/utils/archive"; import { @@ -153,6 +157,7 @@ export interface GoalContinuationRuntimeBridge { workspaceId: string; message: string; options: SendMessageOptions; + startStreamInBackground?: boolean; kind?: GoalSyntheticMessageKind; }): Promise; /** @@ -163,13 +168,20 @@ export interface GoalContinuationRuntimeBridge { getKickoffSendOptions?(workspaceId: string): SendMessageOptions | null; } +type PendingGoalContinuationSource = "stream_end" | "kickoff" | "budget_wrapup"; + interface PendingGoalContinuationCandidate { goalId: string; requestedAtMs: number; streamEndedAtMs: number; + source: PendingGoalContinuationSource; sendOptions: SendMessageOptions; } +interface ChatTailGoalModeResult { + mode: "active" | "paused" | null; +} + interface GoalContinuationEligibilityResult { eligible: boolean; reason?: GoalContinuationSkipReason; @@ -403,6 +415,128 @@ export class WorkspaceGoalService { this.streamInterrupter = interrupter; } + private isSyntheticSnapshotUserMessage(message: MuxMessage): boolean { + return ( + message.role === "user" && + message.metadata?.synthetic === true && + (message.metadata.fileAtMentionSnapshot !== undefined || + message.metadata.agentSkillSnapshot !== undefined) + ); + } + + private async readChatTailGoalMode(workspaceId: string): Promise { + const historyResult = await this.historyService.getLastMessages(workspaceId, 100); + if (!historyResult.success) { + log.warn("Failed to read chat tail for goal mode reconciliation", { + workspaceId, + error: historyResult.error, + }); + return { mode: null }; + } + + if (historyResult.data.length === 0) { + return { mode: null }; + } + + for (let index = historyResult.data.length - 1; index >= 0; index -= 1) { + const message = historyResult.data[index]; + if (message.role !== "user" || this.isSyntheticSnapshotUserMessage(message)) { + continue; + } + + if (message.metadata?.kind === GOAL_CONTINUATION_KIND) { + return { mode: "active" }; + } + if (message.metadata?.muxMetadata?.type === "goal-pause-boundary") { + return { mode: "paused" }; + } + if (message.metadata?.synthetic === true) { + continue; + } + return { mode: "paused" }; + } + + return { mode: null }; + } + + private applyChatTailGoalMode( + goal: GoalRecordV1, + chatTailMode: ChatTailGoalModeResult + ): GoalRecordV1 { + if (chatTailMode.mode == null || (goal.status !== "active" && goal.status !== "paused")) { + return goal; + } + + const desiredStatus = chatTailMode.mode; + if (goal.status === desiredStatus) { + return goal; + } + + // User rationale: goal running/paused mode is locked to the chat tail by + // construction. A goal-continuation user turn is the only durable proof that + // the model has been asked to keep driving the goal; any other latest user + // turn leaves the goal paused until Resume appends a fresh continuation. + const next = GoalRecordV1Schema.parse({ + ...goal, + status: desiredStatus, + updatedAtMs: Date.now(), + }); + return this.applyBudgetDrivenStatus(next); + } + + private async syncGoalStatusToChatTail(workspaceId: string): Promise { + const chatTailMode = await this.readChatTailGoalMode(workspaceId); + return this.fileLocks.withLock(workspaceId, async () => { + const current = await this.readGoalFile(workspaceId); + if (!current) { + await this.pushGoalReadSnapshot(workspaceId, null); + return null; + } + + const next = this.applyChatTailGoalMode(current, chatTailMode); + if (next === current) { + return current; + } + + await this.writeGoal(workspaceId, next); + await this.pushGoalReadSnapshot(workspaceId, next); + this.emitBudgetLimited(next, current.status); + this.emitStatusLifecycle(next, current.status, "auto"); + return next; + }); + } + + private async appendGoalPauseBoundaryIfNeeded(workspaceId: string): Promise { + const chatTailMode = await this.readChatTailGoalMode(workspaceId); + if (chatTailMode.mode !== "active") { + return true; + } + + // Hidden synthetic user boundary: it makes Pause durable in the same + // declarative state model as Resume without rewriting prior continuation + // history. The row is model-visible but not rendered unless synthetic debug + // messages are enabled, matching other context-only system breadcrumbs. + const message = createMuxMessage( + `goal-paused-${Date.now()}-${crypto.randomUUID()}`, + "user", + "Goal paused by the user. Do not continue the goal until a later goal continuation message.", + { + timestamp: Date.now(), + synthetic: true, + muxMetadata: { type: "goal-pause-boundary" }, + } + ); + const appendResult = await this.historyService.appendToHistory(workspaceId, message); + if (!appendResult.success) { + log.warn("Failed to append goal pause boundary", { + workspaceId, + error: appendResult.error, + }); + return false; + } + return true; + } + private getFilePath(workspaceId: string): string { assert(workspaceId.trim().length > 0, "WorkspaceGoalService requires non-empty workspaceId"); return path.join(this.config.getSessionDir(workspaceId), GOAL_FILE); @@ -740,6 +874,7 @@ export class WorkspaceGoalService { goalId: goal.goalId, requestedAtMs: Date.now(), streamEndedAtMs, + source: "stream_end", sendOptions, }); await this.goalContinuationDispatcher.requestDispatch( @@ -907,6 +1042,7 @@ export class WorkspaceGoalService { workspaceId, message, options: candidate.sendOptions, + startStreamInBackground: false, kind: GOAL_BUDGET_LIMIT_KIND, }); if (accepted !== true) { @@ -926,14 +1062,22 @@ export class WorkspaceGoalService { }; } - assert(goal.status === "active", "goal idle payload requires active or budget-limited goal"); - const message = buildGoalContinuationMessage(goal); + const continuationGoal = + goal.status === "paused" && candidate.source === "kickoff" + ? GoalRecordV1Schema.parse({ ...goal, status: "active" }) + : goal; + assert( + continuationGoal.status === "active", + "goal idle payload requires active, paused-kickoff, or budget-limited goal" + ); + const message = buildGoalContinuationMessage(continuationGoal); return { dispatch: async () => { const accepted = await this.goalContinuationBridge?.executeGoalContinuation({ workspaceId, message, options: candidate.sendOptions, + startStreamInBackground: candidate.source === "kickoff", kind: GOAL_CONTINUATION_KIND, }); if (accepted !== true) { @@ -941,7 +1085,14 @@ export class WorkspaceGoalService { return; } await this.recordContinuationFired(workspaceId, goal.goalId, Date.now()); - this.deletePendingCandidateIfStillSame(workspaceId, candidate); + if (candidate.source !== "kickoff") { + this.deletePendingCandidateIfStillSame(workspaceId, candidate); + return; + } + // Keep kickoff candidates until the stream-end path replaces or clears + // them. Background startup failures happen after the synthetic user row + // is accepted; retaining the candidate lets the failure hook re-request + // dispatch instead of stranding the active goal. }, }; } @@ -1047,7 +1198,9 @@ export class WorkspaceGoalService { } } - const goal = await this.normalizeGoalLimits(workspaceId); + const goal = await this.normalizeGoalLimits(workspaceId, { + syncChatTail: candidate.source !== "kickoff", + }); if (!goal) { this.pendingContinuationCandidates.delete(workspaceId); return { eligible: false, reason: "goal_missing" }; @@ -1057,8 +1210,10 @@ export class WorkspaceGoalService { return { eligible: false, reason: "goal_mismatch" }; } if (goal.status !== "active" && goal.status !== "budget_limited") { - this.pendingContinuationCandidates.delete(workspaceId); - return { eligible: false, reason: "goal_not_active" }; + if (goal.status !== "paused" || candidate.source !== "kickoff") { + this.pendingContinuationCandidates.delete(workspaceId); + return { eligible: false, reason: "goal_not_active" }; + } } if (goal.requireUserAcknowledgmentSinceMs != null) { return { eligible: false, reason: "requires_ack" }; @@ -1129,18 +1284,27 @@ export class WorkspaceGoalService { return null; } - private async normalizeGoalLimits(workspaceId: string): Promise { + private async normalizeGoalLimits( + workspaceId: string, + options: { syncChatTail?: boolean } = {} + ): Promise { + const chatTailMode = + options.syncChatTail === true ? await this.readChatTailGoalMode(workspaceId) : null; return this.fileLocks.withLock(workspaceId, async () => { const current = await this.readGoalFile(workspaceId); if (!current) { await this.pushGoalReadSnapshot(workspaceId, null); return null; } - const next = this.applyBudgetDrivenStatus(current); + const budgetNormalized = this.applyBudgetDrivenStatus(current); + const next = chatTailMode + ? this.applyChatTailGoalMode(budgetNormalized, chatTailMode) + : budgetNormalized; if (next !== current) { await this.writeGoal(workspaceId, next); await this.pushGoalReadSnapshot(workspaceId, next); this.emitBudgetLimited(next, current.status); + this.emitStatusLifecycle(next, current.status, "auto"); return next; } await this.pushGoalReadSnapshot(workspaceId, current); @@ -1153,18 +1317,27 @@ export class WorkspaceGoalService { expectedGoalId: string, firedAtMs: number ): Promise { + const chatTailMode = await this.readChatTailGoalMode(workspaceId); await this.fileLocks.withLock(workspaceId, async () => { const current = await this.readGoalFile(workspaceId); - if (current?.goalId !== expectedGoalId || current.status !== "active") { + if (current?.goalId !== expectedGoalId) { + return; + } + const continuationAccepted = + current.status === "active" || + (current.status === "paused" && chatTailMode.mode === "active"); + if (!continuationAccepted) { return; } const next = GoalRecordV1Schema.parse({ ...current, + status: "active", lastContinuationFiredAtMs: firedAtMs, updatedAtMs: firedAtMs, }); await this.writeGoal(workspaceId, next); await this.pushSnapshot(workspaceId, next); + this.emitStatusLifecycle(next, current.status, "auto"); this.emitContinuationFired(next, firedAtMs); }); } @@ -1508,8 +1681,27 @@ export class WorkspaceGoalService { return maybeConfig.loadProvidersConfig() as unknown as ProvidersConfigMap | null; } + async requestPendingGoalContinuationDispatch(workspaceId: string): Promise { + assert( + workspaceId.trim().length > 0, + "requestPendingGoalContinuationDispatch requires workspaceId" + ); + if (!this.pendingContinuationCandidates.has(workspaceId)) { + return; + } + await this.goalContinuationDispatcher?.requestDispatch( + workspaceId, + GOAL_CONTINUATION_IDLE_CONSUMER_NAME + ); + } + + async syncGoalModeWithChatTail(workspaceId: string): Promise { + assert(workspaceId.trim().length > 0, "syncGoalModeWithChatTail requires workspaceId"); + return this.syncGoalStatusToChatTail(workspaceId); + } + async getGoal(workspaceId: string): Promise { - return this.normalizeGoalLimits(workspaceId); + return this.normalizeGoalLimits(workspaceId, { syncChatTail: true }); } /** @@ -1899,17 +2091,35 @@ export class WorkspaceGoalService { return Ok(next); }); - if (result.success) { - if (result.data.status === "active") { - this.armKickoffContinuationIfIdle(input.workspaceId, result.data); - } else if (result.data.status === "budget_limited") { - this.armBudgetWrapupForBudgetLimitedGoal(input.workspaceId, result.data); + if (!result.success) { + return result; + } + + if (input.status === "paused" && result.data.status === "paused") { + this.pendingContinuationCandidates.delete(input.workspaceId); + const pauseBoundaryReady = await this.appendGoalPauseBoundaryIfNeeded(input.workspaceId); + if (!pauseBoundaryReady) { + return result; } + const synced = await this.syncGoalStatusToChatTail(input.workspaceId); + return Ok(synced ?? result.data); + } + + if (result.data.status === "active") { + await this.armKickoffContinuationIfIdle(input.workspaceId, result.data); + const synced = await this.syncGoalStatusToChatTail(input.workspaceId); + return Ok(synced ?? result.data); + } + if (result.data.status === "budget_limited") { + this.armBudgetWrapupForBudgetLimitedGoal(input.workspaceId, result.data); } return result; } - private armKickoffContinuationIfIdle(workspaceId: string, goal: GoalRecordV1): void { + private async armKickoffContinuationIfIdle( + workspaceId: string, + goal: GoalRecordV1 + ): Promise { if (this.suppressKickoffContinuation) { return; } @@ -1923,14 +2133,17 @@ export class WorkspaceGoalService { if (existingCandidate?.goalId === goal.goalId) { // A real stream-end already armed this goal; re-request dispatch in case // the previous request was consumed while an acknowledgment gate was set. - this.goalContinuationDispatcher - .requestDispatch(workspaceId, GOAL_CONTINUATION_IDLE_CONSUMER_NAME) - .catch((error: unknown) => { - log.warn("Failed to re-request kickoff goal continuation dispatch", { - workspaceId, - error, - }); + try { + await this.goalContinuationDispatcher.requestDispatch( + workspaceId, + GOAL_CONTINUATION_IDLE_CONSUMER_NAME + ); + } catch (error: unknown) { + log.warn("Failed to re-request kickoff goal continuation dispatch", { + workspaceId, + error, }); + } return; } const sendOptions = this.goalContinuationBridge.getKickoffSendOptions?.(workspaceId); @@ -1946,13 +2159,17 @@ export class WorkspaceGoalService { goalId: goal.goalId, requestedAtMs: nowMs, streamEndedAtMs: nowMs, + source: "kickoff", sendOptions: continuationSendOptions(sendOptions), }); - this.goalContinuationDispatcher - .requestDispatch(workspaceId, GOAL_CONTINUATION_IDLE_CONSUMER_NAME) - .catch((error: unknown) => { - log.warn("Failed to request kickoff goal continuation dispatch", { workspaceId, error }); - }); + try { + await this.goalContinuationDispatcher.requestDispatch( + workspaceId, + GOAL_CONTINUATION_IDLE_CONSUMER_NAME + ); + } catch (error: unknown) { + log.warn("Failed to request kickoff goal continuation dispatch", { workspaceId, error }); + } } private armContinuationForPromotedGoal(workspaceId: string, goal: GoalRecordV1): void { @@ -1961,7 +2178,9 @@ export class WorkspaceGoalService { // promoted goal until the user pause/unpauses it. this.lastUserStopAtMsByWorkspace.delete(workspaceId); if (goal.status === "active") { - this.armKickoffContinuationIfIdle(workspaceId, goal); + this.armKickoffContinuationIfIdle(workspaceId, goal).catch((error: unknown) => { + log.warn("Failed to arm promoted goal continuation", { workspaceId, error }); + }); } else if (goal.status === "budget_limited") { this.armBudgetWrapupForBudgetLimitedGoal(workspaceId, goal); } @@ -1992,12 +2211,13 @@ export class WorkspaceGoalService { workspaceId.trim().length > 0, "recoverPendingDispatchAfterRestart requires workspaceId" ); - const goal = await this.normalizeGoalLimits(workspaceId); + const goal = await this.normalizeGoalLimits(workspaceId, { syncChatTail: true }); if (!goal) { return; } if (goal.status === "active") { - this.armKickoffContinuationIfIdle(workspaceId, goal); + await this.armKickoffContinuationIfIdle(workspaceId, goal); + await this.syncGoalStatusToChatTail(workspaceId); return; } if (goal.status === "budget_limited" && goal.budgetLimitInjectedForGoalId === null) { @@ -2050,6 +2270,7 @@ export class WorkspaceGoalService { goalId: goal.goalId, requestedAtMs: nowMs, streamEndedAtMs: nowMs, + source: "budget_wrapup", sendOptions: continuationSendOptions(sendOptions), }); this.goalContinuationDispatcher @@ -2326,7 +2547,12 @@ export class WorkspaceGoalService { if (causedLimit) { this.armBudgetWrapupForBudgetLimitedGoal(input.parentWorkspaceId, next); } else if (next.status === "active") { - this.armKickoffContinuationIfIdle(input.parentWorkspaceId, next); + this.armKickoffContinuationIfIdle(input.parentWorkspaceId, next).catch((error: unknown) => { + log.warn("Failed to arm parent goal continuation after child attribution", { + workspaceId: input.parentWorkspaceId, + error, + }); + }); } return { goalBefore: current, diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index f2b41bb9b7..31f748f64e 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -152,7 +152,11 @@ import { type HeartbeatContextMode, } from "@/constants/heartbeat"; import { WORKSPACE_DEFAULTS } from "@/constants/workspaceDefaults"; -import { GOAL_CONTINUATION_KIND, type GoalSyntheticMessageKind } from "@/constants/goals"; +import { + GOAL_BUDGET_LIMIT_KIND, + GOAL_CONTINUATION_KIND, + type GoalSyntheticMessageKind, +} from "@/constants/goals"; import type { StreamStartEvent, StreamEndEvent, @@ -5785,6 +5789,9 @@ export class WorkspaceService extends EventEmitter { goalKind?: GoalSyntheticMessageKind; /** Force Copilot billing classification to "agent" for internal sends. */ agentInitiated?: boolean; + onAcceptedPreStreamFailure?: (error: SendMessageError) => Promise | void; + /** Return once the user message is accepted; stream startup continues asynchronously. */ + startStreamInBackground?: boolean; /** When true, reject instead of queueing if the workspace is busy. */ requireIdle?: boolean; } @@ -5890,6 +5897,8 @@ export class WorkspaceService extends EventEmitter { synthetic: internal?.synthetic, agentInitiated: internal?.agentInitiated, goalKind: internal?.goalKind, + onAcceptedPreStreamFailure: internal?.onAcceptedPreStreamFailure, + startStreamInBackground: internal?.startStreamInBackground, goalContinuation: internal?.goalContinuation, }); } @@ -5967,23 +5976,23 @@ export class WorkspaceService extends EventEmitter { }); } - const restoreInterruptedTaskAfterAcceptedEditFailure = - resumedInterruptedTask && normalizedOptions?.editMessageId - ? async (error: SendMessageError) => { - try { - await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); - } catch (restoreError: unknown) { - log.error( - "Failed to restore interrupted task status after accepted edit startup failure", - { - workspaceId, - error, - restoreError, - } - ); + const onAcceptedPreStreamFailure = async (error: SendMessageError) => { + if (resumedInterruptedTask && normalizedOptions?.editMessageId) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (restoreError: unknown) { + log.error( + "Failed to restore interrupted task status after accepted edit startup failure", + { + workspaceId, + error, + restoreError, } - } - : undefined; + ); + } + } + await internal?.onAcceptedPreStreamFailure?.(error); + }; const shouldRunPendingAutoTitle = internal?.synthetic !== true && @@ -6000,7 +6009,8 @@ export class WorkspaceService extends EventEmitter { agentInitiated: internal?.agentInitiated, goalKind: internal?.goalKind, goalContinuation: internal?.goalContinuation, - onAcceptedPreStreamFailure: restoreInterruptedTaskAfterAcceptedEditFailure, + startStreamInBackground: internal?.startStreamInBackground, + onAcceptedPreStreamFailure, }); if (!result.success) { log.error("sendMessage handler: session returned error", { @@ -7598,12 +7608,16 @@ export class WorkspaceService extends EventEmitter { async executeGoalContinuation(input: { workspaceId: string; message: string; + startStreamInBackground?: boolean; kind?: GoalSyntheticMessageKind; options: SendMessageOptions; }): Promise { assert(input.workspaceId.trim().length > 0, "executeGoalContinuation requires workspaceId"); assert(input.message.trim().length > 0, "executeGoalContinuation requires message"); + const goalKind = input.kind ?? GOAL_CONTINUATION_KIND; + const startStreamInBackground = + input.startStreamInBackground === true && goalKind !== GOAL_BUDGET_LIMIT_KIND; const sendResult = await this.sendMessage( input.workspaceId, input.message, @@ -7615,8 +7629,13 @@ export class WorkspaceService extends EventEmitter { skipAutoResumeReset: true, synthetic: true, agentInitiated: true, + startStreamInBackground, + onAcceptedPreStreamFailure: startStreamInBackground + ? () => + this.workspaceGoalService?.requestPendingGoalContinuationDispatch(input.workspaceId) + : undefined, requireIdle: true, - goalKind: input.kind ?? GOAL_CONTINUATION_KIND, + goalKind, goalContinuation: true, } ); diff --git a/tests/ui/chat/goalSlashCommand.test.ts b/tests/ui/chat/goalSlashCommand.test.ts index 5c30c525a0..68586d6522 100644 --- a/tests/ui/chat/goalSlashCommand.test.ts +++ b/tests/ui/chat/goalSlashCommand.test.ts @@ -21,16 +21,12 @@ describe("Goal slash command", () => { }); try { - const created = await app.env.orpc.workspace.setGoal({ - workspaceId: app.workspaceId, - objective: "old completed goal", - budgetCents: null, - }); - expect(created.success).toBe(true); const completed = await app.env.orpc.workspace.setGoal({ workspaceId: app.workspaceId, + objective: "old completed goal", status: "complete", completionSummary: "Done.", + budgetCents: null, }); expect(completed.success).toBe(true); diff --git a/tests/ui/chat/sendModeDropdown.test.ts b/tests/ui/chat/sendModeDropdown.test.ts index 21b05cc621..23fd0c824e 100644 --- a/tests/ui/chat/sendModeDropdown.test.ts +++ b/tests/ui/chat/sendModeDropdown.test.ts @@ -132,7 +132,7 @@ describe("Send dispatch modes (mock AI router)", () => { } }, 60_000); - test("running goals steer by default and expose an explicit pause send action", async () => { + test("running goals pause on manual sends and omit a redundant pause send action", async () => { const app = await createAppHarness({ branchPrefix: "send-mode-goal-policy" }); try { @@ -149,27 +149,19 @@ describe("Send dispatch modes (mock AI router)", () => { ); }); - await app.chat.typeWithoutSending("Steer without pausing"); - const sendButton = await waitForSendModeMenuTrigger(app.view.container); - fireEvent.click(sendButton); - await app.chat.expectStreamComplete(); - - await waitFor(async () => { - const { goal } = await app.env.orpc.workspace.getGoal({ workspaceId: app.workspaceId }); - expect(goal?.status).toBe("active"); - }); - - await app.chat.typeWithoutSending("Pause after this steering note"); + await app.chat.typeWithoutSending("Manual note pauses the goal"); await openSendModeMenu(app.view.container); - const pauseRow = await waitFor(() => { - const rows = Array.from(app.view.container.querySelectorAll("button")); - const row = rows.find((button) => button.textContent?.includes("Send and pause goal")); - if (!row) { - throw new Error("Send and pause goal row not found"); - } - return row; - }); - fireEvent.click(pauseRow); + const rows = Array.from(app.view.container.querySelectorAll("button")); + expect(rows.some((button) => button.textContent?.includes("Send and pause goal"))).toBe( + false + ); + const sendAfterTurnRow = rows.find((button) => + button.textContent?.includes("Send after turn") + ); + if (!sendAfterTurnRow) { + throw new Error("Send after turn row not found"); + } + fireEvent.click(sendAfterTurnRow); await app.chat.expectStreamComplete(); await waitFor(async () => {