diff --git a/Makefile b/Makefile index 951be36d5f..6f8541a5d5 100644 --- a/Makefile +++ b/Makefile @@ -438,7 +438,8 @@ storybook-build: node_modules/.installed src/version.ts ## Build static Storyboo test-storybook: node_modules/.installed ## Run Storybook interaction tests (requires Storybook to be running or built) $(check_node_version) - @bun x test-storybook + @# Storybook story transitions can exceed Jest's default 15s timeout on loaded CI runners. + @bun x test-storybook --testTimeout 30000 chromatic: node_modules/.installed ## Run Chromatic for visual regression testing $(check_node_version) diff --git a/src/browser/components/ChatPane.tsx b/src/browser/components/ChatPane.tsx index d174a172c3..b63bccea79 100644 --- a/src/browser/components/ChatPane.tsx +++ b/src/browser/components/ChatPane.tsx @@ -117,6 +117,21 @@ function PerfRenderMarker(props: { id: string; children: React.ReactNode }): Rea return <>{props.children}; } +function isChromaticStorybookEnvironment(): boolean { + if (typeof window === "undefined") { + return false; + } + + // Keep production behavior unchanged while suppressing story-only snapshot churn. + const isStorybookPreview = window.location.pathname.endsWith("iframe.html"); + if (!isStorybookPreview) { + return false; + } + + const chromaticRuntimeFlag = (window as Window & { chromatic?: boolean }).chromatic; + return /Chromatic/i.test(window.navigator.userAgent) || chromaticRuntimeFlag === true; +} + interface ChatPaneProps { workspaceId: string; workspaceState: WorkspaceState; @@ -203,7 +218,17 @@ export const ChatPane: React.FC = (props) => { useEffect(() => { workspaceStateRef.current = workspaceState; }, [workspaceState]); - const { messages, canInterrupt, isCompacting, isStreamStarting, loading } = workspaceState; + const { + messages, + canInterrupt, + isCompacting, + isStreamStarting, + loading, + hasOlderHistory, + loadingOlderHistory, + } = workspaceState; + const shouldRenderLoadOlderMessagesButton = hasOlderHistory && !isChromaticStorybookEnvironment(); + const loadOlderMessagesShortcutLabel = formatKeybind(KEYBINDS.LOAD_OLDER_MESSAGES); const { warning: contextSwitchWarning, @@ -598,6 +623,16 @@ export const ChatPane: React.FC = (props) => { lastActionableMessage.errorType === "context_exceeded"; const showRetryBarrierUI = showRetryBarrier && !suppressRetryBarrier; + const handleLoadOlderHistory = useCallback(() => { + if (!shouldRenderLoadOlderMessagesButton || loadingOlderHistory) { + return; + } + + storeRaw.loadOlderHistory(workspaceId).catch((error) => { + console.warn(`[ChatPane] Failed to load older history for ${workspaceId}:`, error); + }); + }, [loadingOlderHistory, shouldRenderLoadOlderMessagesButton, storeRaw, workspaceId]); + // Handle keyboard shortcuts (using optional refs that are safe even if not initialized) useAIViewKeybinds({ workspaceId, @@ -608,6 +643,7 @@ export const ChatPane: React.FC = (props) => { showRetryBarrier, chatInputAPI, jumpToBottom, + loadOlderHistory: shouldRenderLoadOlderMessagesButton ? handleLoadOlderHistory : null, handleOpenTerminal: onOpenTerminal, handleOpenInEditor, aggregator, @@ -730,6 +766,19 @@ export const ChatPane: React.FC = (props) => { ) : ( <> + {shouldRenderLoadOlderMessagesButton && ( +
+ +
+ )} {deferredMessages.map((msg, index) => { const bashOutputGroup = bashOutputGroupInfos[index]; diff --git a/src/browser/components/Settings/sections/KeybindsSection.tsx b/src/browser/components/Settings/sections/KeybindsSection.tsx index c990f3f2a7..9b8870bcbe 100644 --- a/src/browser/components/Settings/sections/KeybindsSection.tsx +++ b/src/browser/components/Settings/sections/KeybindsSection.tsx @@ -21,6 +21,7 @@ const KEYBIND_LABELS: Record = { GENERATE_WORKSPACE_TITLE: "Generate new title", ARCHIVE_WORKSPACE: "Archive workspace", JUMP_TO_BOTTOM: "Jump to bottom", + LOAD_OLDER_MESSAGES: "Load older messages", NEXT_WORKSPACE: "Next workspace", PREV_WORKSPACE: "Previous workspace", TOGGLE_SIDEBAR: "Toggle sidebar", @@ -111,6 +112,7 @@ const KEYBIND_GROUPS: Array<{ label: string; keys: Array "NAVIGATE_BACK", "NAVIGATE_FORWARD", "JUMP_TO_BOTTOM", + "LOAD_OLDER_MESSAGES", ], }, { diff --git a/src/browser/contexts/WorkspaceContext.test.tsx b/src/browser/contexts/WorkspaceContext.test.tsx index a7a74c2bfd..c607120f53 100644 --- a/src/browser/contexts/WorkspaceContext.test.tsx +++ b/src/browser/contexts/WorkspaceContext.test.tsx @@ -74,6 +74,13 @@ describe("WorkspaceContext", () => { const ctx = await setup(); await waitFor(() => expect(ctx().workspaceMetadata.size).toBe(1)); + + // Activate the workspace so onChat subscription starts (required after the + // refactor that scoped onChat to the active workspace only). + act(() => { + getWorkspaceStoreRaw().setActiveWorkspaceId("ws-sync-load"); + }); + await waitFor(() => expect( workspaceApi.onChat.mock.calls.some( diff --git a/src/browser/contexts/WorkspaceContext.tsx b/src/browser/contexts/WorkspaceContext.tsx index 65c051ecb6..9fa7e0acaa 100644 --- a/src/browser/contexts/WorkspaceContext.tsx +++ b/src/browser/contexts/WorkspaceContext.tsx @@ -3,6 +3,7 @@ import { useCallback, useContext, useEffect, + useLayoutEffect, useMemo, useRef, useState, @@ -532,6 +533,18 @@ export function WorkspaceProvider(props: WorkspaceProviderProps) { } = useRouter(); const workspaceStore = useWorkspaceStoreRaw(); + + useLayoutEffect(() => { + // When the user navigates to settings, currentWorkspaceId becomes null + // (URL is /settings/...). Preserve the active workspace subscription so + // chat messages aren't cleared. Only null it out when truly leaving a + // workspace context (e.g., navigating to Home). + if (currentWorkspaceId) { + workspaceStore.setActiveWorkspaceId(currentWorkspaceId); + } else if (!currentSettingsSection) { + workspaceStore.setActiveWorkspaceId(null); + } + }, [workspaceStore, currentWorkspaceId, currentSettingsSection]); const [workspaceMetadata, setWorkspaceMetadataState] = useState< Map >(new Map()); diff --git a/src/browser/hooks/useAIViewKeybinds.test.tsx b/src/browser/hooks/useAIViewKeybinds.test.tsx index c115e48f89..e90a67bccf 100644 --- a/src/browser/hooks/useAIViewKeybinds.test.tsx +++ b/src/browser/hooks/useAIViewKeybinds.test.tsx @@ -54,6 +54,7 @@ describe("useAIViewKeybinds", () => { showRetryBarrier: false, chatInputAPI, jumpToBottom: () => undefined, + loadOlderHistory: null, handleOpenTerminal: () => undefined, handleOpenInEditor: () => undefined, aggregator: undefined, @@ -92,6 +93,7 @@ describe("useAIViewKeybinds", () => { showRetryBarrier: false, chatInputAPI, jumpToBottom: () => undefined, + loadOlderHistory: null, handleOpenTerminal: () => undefined, handleOpenInEditor: () => undefined, aggregator: undefined, @@ -134,6 +136,7 @@ describe("useAIViewKeybinds", () => { showRetryBarrier: false, chatInputAPI, jumpToBottom: () => undefined, + loadOlderHistory: null, handleOpenTerminal: () => undefined, handleOpenInEditor: () => undefined, aggregator: undefined, @@ -177,6 +180,7 @@ describe("useAIViewKeybinds", () => { showRetryBarrier: false, chatInputAPI, jumpToBottom: () => undefined, + loadOlderHistory: null, handleOpenTerminal: () => undefined, handleOpenInEditor: () => undefined, aggregator: undefined, @@ -201,6 +205,38 @@ describe("useAIViewKeybinds", () => { expect(interruptStream.mock.calls.length).toBe(1); }); + test("Shift+H loads older history when callback is provided", () => { + const loadOlderHistory = mock(() => undefined); + const chatInputAPI: RefObject = { current: null }; + + renderHook(() => + useAIViewKeybinds({ + workspaceId: "ws", + canInterrupt: false, + showRetryBarrier: false, + chatInputAPI, + jumpToBottom: () => undefined, + loadOlderHistory, + handleOpenTerminal: () => undefined, + handleOpenInEditor: () => undefined, + aggregator: undefined, + setEditingMessage: () => undefined, + vimEnabled: false, + }) + ); + + document.body.dispatchEvent( + new window.KeyboardEvent("keydown", { + key: "H", + shiftKey: true, + bubbles: true, + cancelable: true, + }) + ); + + expect(loadOlderHistory.mock.calls.length).toBe(1); + }); + test("Escape does not interrupt when a modal stops propagation (e.g., Settings)", () => { const interruptStream = mock(() => Promise.resolve({ success: true as const, data: undefined }) @@ -220,6 +256,7 @@ describe("useAIViewKeybinds", () => { showRetryBarrier: false, chatInputAPI, jumpToBottom: () => undefined, + loadOlderHistory: null, handleOpenTerminal: () => undefined, handleOpenInEditor: () => undefined, aggregator: undefined, diff --git a/src/browser/hooks/useAIViewKeybinds.ts b/src/browser/hooks/useAIViewKeybinds.ts index e946e684ed..d0e87db499 100644 --- a/src/browser/hooks/useAIViewKeybinds.ts +++ b/src/browser/hooks/useAIViewKeybinds.ts @@ -20,6 +20,7 @@ interface UseAIViewKeybindsParams { showRetryBarrier: boolean; chatInputAPI: React.RefObject; jumpToBottom: () => void; + loadOlderHistory: (() => void) | null; handleOpenTerminal: () => void; handleOpenInEditor: () => void; aggregator: StreamingMessageAggregator | undefined; // For compaction detection @@ -31,7 +32,8 @@ interface UseAIViewKeybindsParams { * Manages keyboard shortcuts for AIView: * - Esc (non-vim) or Ctrl+C (vim): Interrupt stream (Escape skips text inputs by default) * - Ctrl+I: Focus chat input - * - Ctrl+G: Jump to bottom + * - Shift+H: Load older transcript messages (when available) + * - Shift+G: Jump to bottom * - Ctrl+T: Open terminal * - Ctrl+Shift+E: Open in editor * - Ctrl+C (during compaction in vim mode): Cancel compaction, restore command @@ -44,6 +46,7 @@ export function useAIViewKeybinds({ showRetryBarrier, chatInputAPI, jumpToBottom, + loadOlderHistory, handleOpenTerminal, handleOpenInEditor, aggregator, @@ -135,6 +138,12 @@ export function useAIViewKeybinds({ return; } + if (matchesKeybind(e, KEYBINDS.LOAD_OLDER_MESSAGES) && loadOlderHistory) { + e.preventDefault(); + loadOlderHistory(); + return; + } + if (matchesKeybind(e, KEYBINDS.JUMP_TO_BOTTOM)) { e.preventDefault(); jumpToBottom(); @@ -154,6 +163,7 @@ export function useAIViewKeybinds({ }; }, [ jumpToBottom, + loadOlderHistory, handleOpenTerminal, handleOpenInEditor, workspaceId, diff --git a/src/browser/hooks/useResumeManager.ts b/src/browser/hooks/useResumeManager.ts index c99e43b6e6..9199c94466 100644 --- a/src/browser/hooks/useResumeManager.ts +++ b/src/browser/hooks/useResumeManager.ts @@ -36,7 +36,8 @@ export interface RetryState { * Why this matters: * - Consistency: All retries use the same backoff, state management, eligibility checks * - Maintainability: One place to update retry logic - * - Background operation: Works for all workspaces, even non-visible ones + * - Safety-first operation: Auto-retry only runs for the workspace with an active + * onChat subscription (manual retry still works when the workspace is opened) * - Idempotency: Safe to emit events multiple times, hook silently ignores invalid requests * * autoRetry State Semantics (Explicit Transitions Only): @@ -54,7 +55,7 @@ export interface RetryState { * - Polling-based: Checks all workspaces every 1 second * - Event-driven: Also reacts to RESUME_CHECK_REQUESTED events for fast path * - Idempotent: Safe to call multiple times, silently ignores invalid requests - * - Background operation: Works for all workspaces, visible or not + * - Conservative eligibility: Background workspaces skip auto-retry until active * - Exponential backoff: 1s → 2s → 4s → 8s → ... → 60s (max) * * Checks happen on: @@ -102,6 +103,13 @@ export function useResumeManager() { return false; } + // Only the active onChat workspace receives authoritative stream-error/abort + // transcript updates. Inactive workspaces only get activity snapshots, which + // are insufficient to classify retryability safely. + if (!store.isOnChatSubscriptionActive(workspaceId)) { + return false; + } + // 1. Must have interrupted stream that's eligible for auto-retry (not currently streaming) if (state.canInterrupt) return false; // Currently streaming diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index 16695b99dd..1a0e5bfeee 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -1,23 +1,71 @@ import { describe, expect, it, beforeEach, afterEach, mock, type Mock } from "bun:test"; import type { FrontendWorkspaceMetadata } from "@/common/types/workspace"; import type { StreamStartEvent, ToolCallStartEvent } from "@/common/types/stream"; -import type { WorkspaceChatMessage } from "@/common/orpc/types"; +import type { WorkspaceActivitySnapshot, WorkspaceChatMessage } from "@/common/orpc/types"; import { DEFAULT_RUNTIME_CONFIG } from "@/common/constants/workspace"; import { WorkspaceStore } from "./WorkspaceStore"; +interface LoadMoreResponse { + messages: WorkspaceChatMessage[]; + nextCursor: { beforeHistorySequence: number; beforeMessageId?: string | null } | null; + hasOlder: boolean; +} + // Mock client // eslint-disable-next-line require-yield -const mockOnChat = mock(async function* (): AsyncGenerator { - // yield nothing by default - await Promise.resolve(); +const mockOnChat = mock(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } +): AsyncGenerator { + // Keep the iterator open until the store aborts it (prevents retry-loop noise in tests). + await new Promise((resolve) => { + if (!options?.signal) { + resolve(); + return; + } + options.signal.addEventListener("abort", () => resolve(), { once: true }); + }); }); const mockGetSessionUsage = mock(() => Promise.resolve(undefined)); +const mockHistoryLoadMore = mock( + (): Promise => + Promise.resolve({ + messages: [], + nextCursor: null, + hasOlder: false, + }) +); +const mockActivityList = mock(() => Promise.resolve>({})); +// eslint-disable-next-line require-yield +const mockActivitySubscribe = mock(async function* ( + _input?: void, + options?: { signal?: AbortSignal } +): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown +> { + await new Promise((resolve) => { + if (!options?.signal) { + resolve(); + return; + } + options.signal.addEventListener("abort", () => resolve(), { once: true }); + }); +}); const mockClient = { workspace: { onChat: mockOnChat, getSessionUsage: mockGetSessionUsage, + history: { + loadMore: mockHistoryLoadMore, + }, + activity: { + list: mockActivityList, + subscribe: mockActivitySubscribe, + }, }, }; @@ -43,7 +91,8 @@ global.queueMicrotask = (fn) => fn(); function createAndAddWorkspace( store: WorkspaceStore, workspaceId: string, - options: Partial = {} + options: Partial = {}, + activate = true ): FrontendWorkspaceMetadata { const metadata: FrontendWorkspaceMetadata = { id: workspaceId, @@ -54,16 +103,49 @@ function createAndAddWorkspace( createdAt: options.createdAt ?? new Date().toISOString(), runtimeConfig: options.runtimeConfig ?? DEFAULT_RUNTIME_CONFIG, }; + if (activate) { + store.setActiveWorkspaceId(workspaceId); + } store.addWorkspace(metadata); return metadata; } +function createHistoryMessageEvent(id: string, historySequence: number): WorkspaceChatMessage { + return { + type: "message", + id, + role: "user", + parts: [{ type: "text", text: `message-${historySequence}` }], + metadata: { historySequence, timestamp: historySequence }, + }; +} + +async function waitForAbortSignal(signal?: AbortSignal): Promise { + await new Promise((resolve) => { + if (!signal) { + resolve(); + return; + } + signal.addEventListener("abort", () => resolve(), { once: true }); + }); +} + describe("WorkspaceStore", () => { let store: WorkspaceStore; let mockOnModelUsed: Mock<(model: string) => void>; beforeEach(() => { mockOnChat.mockClear(); + mockGetSessionUsage.mockClear(); + mockHistoryLoadMore.mockClear(); + mockActivityList.mockClear(); + mockActivitySubscribe.mockClear(); + mockHistoryLoadMore.mockResolvedValue({ + messages: [], + nextCursor: null, + hasOlder: false, + }); + mockActivityList.mockResolvedValue({}); mockOnModelUsed = mock(() => undefined); store = new WorkspaceStore(mockOnModelUsed); // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any @@ -129,6 +211,7 @@ describe("WorkspaceStore", () => { }); // Add workspace + store.setActiveWorkspaceId(workspaceId); store.addWorkspace(metadata); // Check initial recency @@ -175,6 +258,7 @@ describe("WorkspaceStore", () => { }); // Add workspace (should trigger IPC subscription) + store.setActiveWorkspaceId(metadata.id); store.addWorkspace(metadata); // Wait for async processing @@ -211,6 +295,7 @@ describe("WorkspaceStore", () => { // Unsubscribe before adding workspace (which triggers updates) unsubscribe(); + store.setActiveWorkspaceId(metadata.id); store.addWorkspace(metadata); // Wait for async processing @@ -220,6 +305,188 @@ describe("WorkspaceStore", () => { }); }); + describe("active workspace subscriptions", () => { + it("does not start onChat until workspace becomes active", async () => { + const workspaceId = "inactive-workspace"; + createAndAddWorkspace(store, workspaceId, {}, false); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(mockOnChat).not.toHaveBeenCalled(); + + store.setActiveWorkspaceId(workspaceId); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(mockOnChat).toHaveBeenCalledWith( + expect.objectContaining({ workspaceId }), + expect.anything() + ); + }); + + it("switches onChat subscriptions when active workspace changes", async () => { + // eslint-disable-next-line require-yield + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + await new Promise((resolve) => { + if (!options?.signal) { + resolve(); + return; + } + options.signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + createAndAddWorkspace(store, "workspace-1", {}, false); + createAndAddWorkspace(store, "workspace-2", {}, false); + + store.setActiveWorkspaceId("workspace-1"); + await new Promise((resolve) => setTimeout(resolve, 0)); + + store.setActiveWorkspaceId("workspace-2"); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const subscribedWorkspaceIds = mockOnChat.mock.calls.map((call) => { + const input = call[0] as { workspaceId?: string }; + return input.workspaceId; + }); + + expect(subscribedWorkspaceIds).toEqual(["workspace-1", "workspace-2"]); + }); + + it("clears replay buffers before aborting the previous active workspace subscription", async () => { + // eslint-disable-next-line require-yield + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + await waitForAbortSignal(options?.signal); + }); + + createAndAddWorkspace(store, "workspace-1", {}, false); + createAndAddWorkspace(store, "workspace-2", {}, false); + + store.setActiveWorkspaceId("workspace-1"); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const transientState = ( + store as unknown as { + chatTransientState: Map< + string, + { + caughtUp: boolean; + replayingHistory: boolean; + historicalMessages: WorkspaceChatMessage[]; + pendingStreamEvents: WorkspaceChatMessage[]; + } + >; + } + ).chatTransientState.get("workspace-1"); + expect(transientState).toBeDefined(); + + transientState!.caughtUp = false; + transientState!.replayingHistory = true; + transientState!.historicalMessages.push( + createHistoryMessageEvent("stale-buffered-message", 9) + ); + transientState!.pendingStreamEvents.push({ + type: "stream-start", + workspaceId: "workspace-1", + messageId: "stale-buffered-stream", + model: "claude-sonnet-4", + historySequence: 10, + startTime: Date.now(), + }); + + // Switching active workspaces should clear replay buffers synchronously + // before aborting the previous subscription. + store.setActiveWorkspaceId("workspace-2"); + + expect(transientState!.caughtUp).toBe(false); + expect(transientState!.replayingHistory).toBe(false); + expect(transientState!.historicalMessages).toHaveLength(0); + expect(transientState!.pendingStreamEvents).toHaveLength(0); + }); + it("drops queued chat events from an aborted subscription attempt", async () => { + const queuedMicrotasks: Array<() => void> = []; + const originalQueueMicrotask = global.queueMicrotask; + let resolveQueuedEvent!: () => void; + const queuedEvent = new Promise((resolve) => { + resolveQueuedEvent = resolve; + }); + + global.queueMicrotask = (callback) => { + queuedMicrotasks.push(callback); + resolveQueuedEvent(); + }; + + try { + mockOnChat.mockImplementation(async function* ( + input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + if (input?.workspaceId === "workspace-1") { + yield createHistoryMessageEvent("queued-after-switch", 11); + } + await waitForAbortSignal(options?.signal); + }); + + createAndAddWorkspace(store, "workspace-1", {}, false); + createAndAddWorkspace(store, "workspace-2", {}, false); + + store.setActiveWorkspaceId("workspace-1"); + await queuedEvent; + + const transientState = ( + store as unknown as { + chatTransientState: Map< + string, + { + historicalMessages: WorkspaceChatMessage[]; + pendingStreamEvents: WorkspaceChatMessage[]; + } + >; + } + ).chatTransientState.get("workspace-1"); + expect(transientState).toBeDefined(); + + // Abort workspace-1 attempt by moving focus; the queued callback should now no-op. + store.setActiveWorkspaceId("workspace-2"); + + for (const callback of queuedMicrotasks) { + callback(); + } + + expect(transientState!.historicalMessages).toHaveLength(0); + expect(transientState!.pendingStreamEvents).toHaveLength(0); + } finally { + global.queueMicrotask = originalQueueMicrotask; + } + }); + }); + + it("tracks which workspace currently has the active onChat subscription", async () => { + createAndAddWorkspace(store, "workspace-1", {}, false); + createAndAddWorkspace(store, "workspace-2", {}, false); + + expect(store.isOnChatSubscriptionActive("workspace-1")).toBe(false); + expect(store.isOnChatSubscriptionActive("workspace-2")).toBe(false); + + store.setActiveWorkspaceId("workspace-1"); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(store.isOnChatSubscriptionActive("workspace-1")).toBe(true); + expect(store.isOnChatSubscriptionActive("workspace-2")).toBe(false); + + store.setActiveWorkspaceId("workspace-2"); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(store.isOnChatSubscriptionActive("workspace-1")).toBe(false); + expect(store.isOnChatSubscriptionActive("workspace-2")).toBe(true); + + store.setActiveWorkspaceId(null); + expect(store.isOnChatSubscriptionActive("workspace-1")).toBe(false); + expect(store.isOnChatSubscriptionActive("workspace-2")).toBe(false); + }); + describe("syncWorkspaces", () => { it("should add new workspaces", () => { const metadata1: FrontendWorkspaceMetadata = { @@ -233,9 +500,13 @@ describe("WorkspaceStore", () => { }; const workspaceMap = new Map([[metadata1.id, metadata1]]); + store.setActiveWorkspaceId(metadata1.id); store.syncWorkspaces(workspaceMap); - expect(mockOnChat).toHaveBeenCalledWith({ workspaceId: "workspace-1" }, expect.anything()); + expect(mockOnChat).toHaveBeenCalledWith( + expect.objectContaining({ workspaceId: "workspace-1" }), + expect.anything() + ); }); it("should remove deleted workspaces", () => { @@ -293,6 +564,676 @@ describe("WorkspaceStore", () => { }); }); + describe("history pagination", () => { + it("initializes pagination from the oldest loaded history sequence on caught-up", async () => { + const workspaceId = "history-pagination-workspace-1"; + + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + yield createHistoryMessageEvent("msg-newer", 5); + await Promise.resolve(); + yield { type: "caught-up", hasOlderHistory: true }; + await waitForAbortSignal(options?.signal); + }); + + createAndAddWorkspace(store, workspaceId); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const state = store.getWorkspaceState(workspaceId); + expect(state.hasOlderHistory).toBe(true); + expect(state.loadingOlderHistory).toBe(false); + }); + + it("does not infer older history from non-boundary sequences without server metadata", async () => { + const workspaceId = "history-pagination-no-boundary"; + + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + yield createHistoryMessageEvent("msg-non-boundary", 5); + await Promise.resolve(); + yield { type: "caught-up" }; + await waitForAbortSignal(options?.signal); + }); + + createAndAddWorkspace(store, workspaceId); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const state = store.getWorkspaceState(workspaceId); + expect(state.hasOlderHistory).toBe(false); + expect(state.loadingOlderHistory).toBe(false); + }); + + it("loads older history and prepends it to the transcript", async () => { + const workspaceId = "history-pagination-workspace-2"; + + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + yield createHistoryMessageEvent("msg-newer", 5); + await Promise.resolve(); + yield { type: "caught-up", hasOlderHistory: true }; + await waitForAbortSignal(options?.signal); + }); + + mockHistoryLoadMore.mockResolvedValueOnce({ + messages: [createHistoryMessageEvent("msg-older", 3)], + nextCursor: null, + hasOlder: false, + }); + + createAndAddWorkspace(store, workspaceId); + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(store.getWorkspaceState(workspaceId).hasOlderHistory).toBe(true); + + await store.loadOlderHistory(workspaceId); + + expect(mockHistoryLoadMore).toHaveBeenCalledWith({ + workspaceId, + cursor: { + beforeHistorySequence: 5, + beforeMessageId: "msg-newer", + }, + }); + + const state = store.getWorkspaceState(workspaceId); + expect(state.hasOlderHistory).toBe(false); + expect(state.loadingOlderHistory).toBe(false); + expect(state.muxMessages.map((message) => message.id)).toEqual(["msg-older", "msg-newer"]); + }); + + it("exposes loadingOlderHistory while requests are in flight and ignores concurrent loads", async () => { + const workspaceId = "history-pagination-workspace-3"; + + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + yield createHistoryMessageEvent("msg-newer", 5); + await Promise.resolve(); + yield { type: "caught-up", hasOlderHistory: true }; + await waitForAbortSignal(options?.signal); + }); + + let resolveLoadMore: ((value: LoadMoreResponse) => void) | undefined; + + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve; + }); + mockHistoryLoadMore.mockReturnValueOnce(loadMorePromise); + + createAndAddWorkspace(store, workspaceId); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const firstLoad = store.loadOlderHistory(workspaceId); + expect(store.getWorkspaceState(workspaceId).loadingOlderHistory).toBe(true); + + const secondLoad = store.loadOlderHistory(workspaceId); + expect(mockHistoryLoadMore).toHaveBeenCalledTimes(1); + + resolveLoadMore?.({ + messages: [], + nextCursor: null, + hasOlder: false, + }); + + await firstLoad; + await secondLoad; + + const state = store.getWorkspaceState(workspaceId); + expect(state.loadingOlderHistory).toBe(false); + expect(state.hasOlderHistory).toBe(false); + }); + + it("ignores stale load-more responses after pagination state changes", async () => { + const workspaceId = "history-pagination-stale-response"; + + mockOnChat.mockImplementation(async function* ( + _input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + yield createHistoryMessageEvent("msg-newer", 5); + await Promise.resolve(); + yield { type: "caught-up", hasOlderHistory: true }; + await waitForAbortSignal(options?.signal); + }); + + let resolveLoadMore: ((value: LoadMoreResponse) => void) | undefined; + const loadMorePromise = new Promise((resolve) => { + resolveLoadMore = resolve; + }); + mockHistoryLoadMore.mockReturnValueOnce(loadMorePromise); + + createAndAddWorkspace(store, workspaceId); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const loadOlderPromise = store.loadOlderHistory(workspaceId); + expect(store.getWorkspaceState(workspaceId).loadingOlderHistory).toBe(true); + + const internalHistoryPagination = ( + store as unknown as { + historyPagination: Map< + string, + { + nextCursor: { beforeHistorySequence: number; beforeMessageId?: string | null } | null; + hasOlder: boolean; + loading: boolean; + } + >; + } + ).historyPagination; + // Simulate a concurrent pagination reset (e.g., live compaction boundary arriving). + internalHistoryPagination.set(workspaceId, { + nextCursor: null, + hasOlder: false, + loading: false, + }); + + resolveLoadMore?.({ + messages: [createHistoryMessageEvent("msg-stale-older", 3)], + nextCursor: { + beforeHistorySequence: 3, + beforeMessageId: "msg-stale-older", + }, + hasOlder: true, + }); + + await loadOlderPromise; + + const state = store.getWorkspaceState(workspaceId); + expect(state.muxMessages.map((message) => message.id)).toEqual(["msg-newer"]); + expect(state.hasOlderHistory).toBe(false); + expect(state.loadingOlderHistory).toBe(false); + }); + }); + + describe("activity fallbacks", () => { + it("uses activity snapshots for non-active workspace sidebar fields", async () => { + const workspaceId = "activity-fallback-workspace"; + const activityRecency = new Date("2024-01-03T12:00:00.000Z").getTime(); + const activitySnapshot: WorkspaceActivitySnapshot = { + recency: activityRecency, + streaming: true, + lastModel: "claude-sonnet-4", + lastThinkingLevel: "high", + }; + + // Recreate the store so the first activity.list call uses this test snapshot. + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + mockActivityList.mockResolvedValue({ [workspaceId]: activitySnapshot }); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient(mockClient as any); + + // Let the initial activity.list call resolve and queue its state updates. + await new Promise((resolve) => setTimeout(resolve, 0)); + + createAndAddWorkspace( + store, + workspaceId, + { + createdAt: "2020-01-01T00:00:00.000Z", + }, + false + ); + + const state = store.getWorkspaceState(workspaceId); + expect(state.canInterrupt).toBe(true); + expect(state.currentModel).toBe(activitySnapshot.lastModel); + expect(state.currentThinkingLevel).toBe(activitySnapshot.lastThinkingLevel); + expect(state.recencyTimestamp).toBe(activitySnapshot.recency); + }); + + it("fires response-complete callback when a background workspace stops streaming", async () => { + const activeWorkspaceId = "active-workspace"; + const backgroundWorkspaceId = "background-workspace"; + const initialRecency = new Date("2024-01-05T00:00:00.000Z").getTime(); + + const backgroundStreamingSnapshot: WorkspaceActivitySnapshot = { + recency: initialRecency, + streaming: true, + lastModel: "claude-sonnet-4", + lastThinkingLevel: null, + }; + + let releaseBackgroundCompletion!: () => void; + const backgroundCompletionReady = new Promise((resolve) => { + releaseBackgroundCompletion = resolve; + }); + + mockActivityList.mockResolvedValue({ + [backgroundWorkspaceId]: backgroundStreamingSnapshot, + }); + + mockActivitySubscribe.mockImplementation(async function* ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > { + await backgroundCompletionReady; + if (options?.signal?.aborted) { + return; + } + + yield { + workspaceId: backgroundWorkspaceId, + activity: { + ...backgroundStreamingSnapshot, + recency: initialRecency + 1, + streaming: false, + }, + }; + + await waitForAbortSignal(options?.signal); + }); + + const onResponseComplete = mock( + ( + _workspaceId: string, + _messageId: string, + _isFinal: boolean, + _finalText: string, + _compaction?: { hasContinueMessage: boolean }, + _completedAt?: number | null + ) => undefined + ); + + // Recreate the store so the first activity.list call uses this test snapshot. + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + store.setOnResponseComplete(onResponseComplete); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient(mockClient as any); + + createAndAddWorkspace(store, activeWorkspaceId); + createAndAddWorkspace(store, backgroundWorkspaceId, {}, false); + + releaseBackgroundCompletion(); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(onResponseComplete).toHaveBeenCalledTimes(1); + expect(onResponseComplete).toHaveBeenCalledWith( + backgroundWorkspaceId, + "", + true, + "", + undefined, + initialRecency + 1 + ); + }); + + it("preserves compaction continue metadata for background completion callbacks", async () => { + const activeWorkspaceId = "active-workspace-continue"; + const backgroundWorkspaceId = "background-workspace-continue"; + const initialRecency = new Date("2024-01-08T00:00:00.000Z").getTime(); + + const backgroundStreamingSnapshot: WorkspaceActivitySnapshot = { + recency: initialRecency, + streaming: true, + lastModel: "claude-sonnet-4", + lastThinkingLevel: null, + }; + + let releaseBackgroundCompletion!: () => void; + const backgroundCompletionReady = new Promise((resolve) => { + releaseBackgroundCompletion = resolve; + }); + + mockActivityList.mockResolvedValue({ + [backgroundWorkspaceId]: backgroundStreamingSnapshot, + }); + + mockActivitySubscribe.mockImplementation(async function* ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > { + await backgroundCompletionReady; + if (options?.signal?.aborted) { + return; + } + + yield { + workspaceId: backgroundWorkspaceId, + activity: { + ...backgroundStreamingSnapshot, + recency: initialRecency + 1, + streaming: false, + }, + }; + + await waitForAbortSignal(options?.signal); + }); + + mockOnChat.mockImplementation(async function* ( + input?: { workspaceId: string; mode?: unknown }, + options?: { signal?: AbortSignal } + ): AsyncGenerator { + if (input?.workspaceId !== backgroundWorkspaceId) { + await waitForAbortSignal(options?.signal); + return; + } + + yield { + type: "message", + id: "compaction-request-msg", + role: "user", + parts: [{ type: "text", text: "/compact" }], + metadata: { + historySequence: 1, + timestamp: Date.now(), + muxMetadata: { + type: "compaction-request", + rawCommand: "/compact", + parsed: { + model: "claude-sonnet-4", + followUpContent: { + text: "continue after compaction", + model: "claude-sonnet-4", + agentId: "exec", + }, + }, + }, + }, + }; + + yield { + type: "stream-start", + workspaceId: backgroundWorkspaceId, + messageId: "compaction-stream", + historySequence: 2, + model: "claude-sonnet-4", + startTime: Date.now(), + mode: "exec", + }; + + yield { type: "caught-up", hasOlderHistory: false }; + + await waitForAbortSignal(options?.signal); + }); + + const onResponseComplete = mock( + ( + _workspaceId: string, + _messageId: string, + _isFinal: boolean, + _finalText: string, + _compaction?: { hasContinueMessage: boolean }, + _completedAt?: number | null + ) => undefined + ); + + // Recreate the store so the first activity.list call uses this test snapshot. + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + store.setOnResponseComplete(onResponseComplete); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient(mockClient as any); + + createAndAddWorkspace(store, backgroundWorkspaceId); + + const waitUntil = async (condition: () => boolean, timeoutMs = 2000): Promise => { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (condition()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + return false; + }; + + const sawCompactingStream = await waitUntil( + () => store.getWorkspaceState(backgroundWorkspaceId).isCompacting + ); + expect(sawCompactingStream).toBe(true); + + // Move focus to a different workspace so the compaction workspace is backgrounded. + createAndAddWorkspace(store, activeWorkspaceId); + + releaseBackgroundCompletion(); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(onResponseComplete).toHaveBeenCalledTimes(1); + expect(onResponseComplete).toHaveBeenCalledWith( + backgroundWorkspaceId, + "", + true, + "", + { hasContinueMessage: true }, + initialRecency + 1 + ); + }); + + it("does not fire response-complete callback when background streaming stops without recency advance", async () => { + const activeWorkspaceId = "active-workspace-no-replay"; + const backgroundWorkspaceId = "background-workspace-no-replay"; + const initialRecency = new Date("2024-01-06T00:00:00.000Z").getTime(); + + const backgroundStreamingSnapshot: WorkspaceActivitySnapshot = { + recency: initialRecency, + streaming: true, + lastModel: "claude-sonnet-4", + lastThinkingLevel: null, + }; + + let releaseBackgroundTransition!: () => void; + const backgroundTransitionReady = new Promise((resolve) => { + releaseBackgroundTransition = resolve; + }); + + mockActivityList.mockResolvedValue({ + [backgroundWorkspaceId]: backgroundStreamingSnapshot, + }); + + mockActivitySubscribe.mockImplementation(async function* ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > { + await backgroundTransitionReady; + if (options?.signal?.aborted) { + return; + } + + yield { + workspaceId: backgroundWorkspaceId, + activity: { + ...backgroundStreamingSnapshot, + // Abort/error transitions can stop streaming without advancing recency. + recency: initialRecency, + streaming: false, + }, + }; + + await waitForAbortSignal(options?.signal); + }); + + const onResponseComplete = mock( + ( + _workspaceId: string, + _messageId: string, + _isFinal: boolean, + _finalText: string, + _compaction?: { hasContinueMessage: boolean }, + _completedAt?: number | null + ) => undefined + ); + + // Recreate the store so the first activity.list call uses this test snapshot. + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + store.setOnResponseComplete(onResponseComplete); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient(mockClient as any); + + createAndAddWorkspace(store, activeWorkspaceId); + createAndAddWorkspace(store, backgroundWorkspaceId, {}, false); + + releaseBackgroundTransition(); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(onResponseComplete).not.toHaveBeenCalled(); + }); + it("clears activity stream-start recency cache on dispose", () => { + const workspaceId = "dispose-clears-activity-recency"; + const internalStore = store as unknown as { + activityStreamingStartRecency: Map; + }; + + internalStore.activityStreamingStartRecency.set(workspaceId, Date.now()); + expect(internalStore.activityStreamingStartRecency.has(workspaceId)).toBe(true); + + store.dispose(); + + expect(internalStore.activityStreamingStartRecency.size).toBe(0); + }); + + it("opens activity subscription before listing snapshots", async () => { + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + + const callOrder: string[] = []; + + mockActivitySubscribe.mockImplementation( + ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > => { + callOrder.push("subscribe"); + + // eslint-disable-next-line require-yield + return (async function* (): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > { + await waitForAbortSignal(options?.signal); + })(); + } + ); + + mockActivityList.mockImplementation(() => { + callOrder.push("list"); + return Promise.resolve({}); + }); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient({ workspace: mockClient.workspace } as any); + + const waitUntil = async (condition: () => boolean, timeoutMs = 2000): Promise => { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (condition()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + return false; + }; + + const sawBothCalls = await waitUntil(() => callOrder.length >= 2); + expect(sawBothCalls).toBe(true); + expect(callOrder.slice(0, 2)).toEqual(["subscribe", "list"]); + }); + + it("preserves cached activity snapshots when list returns an empty payload", async () => { + const workspaceId = "activity-list-empty-payload"; + const initialRecency = new Date("2024-01-07T00:00:00.000Z").getTime(); + const snapshot: WorkspaceActivitySnapshot = { + recency: initialRecency, + streaming: true, + lastModel: "claude-sonnet-4", + lastThinkingLevel: "high", + }; + + store.dispose(); + store = new WorkspaceStore(mockOnModelUsed); + + let listCallCount = 0; + mockActivityList.mockImplementation( + (): Promise> => { + listCallCount += 1; + if (listCallCount === 1) { + return Promise.resolve({ [workspaceId]: snapshot }); + } + return Promise.resolve({}); + } + ); + + // eslint-disable-next-line require-yield + mockActivitySubscribe.mockImplementation(async function* ( + _input?: void, + options?: { signal?: AbortSignal } + ): AsyncGenerator< + { workspaceId: string; activity: WorkspaceActivitySnapshot | null }, + void, + unknown + > { + await waitForAbortSignal(options?.signal); + }); + + const waitUntil = async (condition: () => boolean, timeoutMs = 2000): Promise => { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (condition()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + return false; + }; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient({ workspace: mockClient.workspace } as any); + createAndAddWorkspace( + store, + workspaceId, + { + createdAt: "2020-01-01T00:00:00.000Z", + }, + false + ); + + const seededSnapshot = await waitUntil(() => { + const state = store.getWorkspaceState(workspaceId); + return state.recencyTimestamp === initialRecency && state.canInterrupt === true; + }); + expect(seededSnapshot).toBe(true); + + // Swap to a new client object to force activity subscription restart and a fresh list() call. + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any + store.setClient({ workspace: mockClient.workspace } as any); + + const sawRetryListCall = await waitUntil(() => listCallCount >= 2); + expect(sawRetryListCall).toBe(true); + + const stateAfterEmptyList = store.getWorkspaceState(workspaceId); + expect(stateAfterEmptyList.recencyTimestamp).toBe(initialRecency); + expect(stateAfterEmptyList.canInterrupt).toBe(true); + expect(stateAfterEmptyList.currentModel).toBe(snapshot.lastModel); + expect(stateAfterEmptyList.currentThinkingLevel).toBe(snapshot.lastThinkingLevel); + }); + }); + describe("getWorkspaceRecency", () => { it("should return stable reference when values unchanged", () => { const recency1 = store.getWorkspaceRecency(); @@ -336,6 +1277,7 @@ describe("WorkspaceStore", () => { }); }); + store.setActiveWorkspaceId(metadata.id); store.addWorkspace(metadata); // Wait for async processing @@ -448,6 +1390,9 @@ describe("WorkspaceStore", () => { // Add workspace first createAndAddWorkspace(store, "test-workspace"); + // Ignore setup emissions so this test only validates getAggregator() side effects. + emitCount = 0; + // Simulate what happens during render - component calls getAggregator const aggregator1 = store.getAggregator("test-workspace"); expect(aggregator1).toBeDefined(); @@ -483,7 +1428,7 @@ describe("WorkspaceStore", () => { unknown > { yield { type: "caught-up" }; - await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 30)); yield { type: "stream-start", historySequence: 1, @@ -497,12 +1442,13 @@ describe("WorkspaceStore", () => { }); }); + store.setActiveWorkspaceId(metadata.id); store.addWorkspace(metadata); const state1 = store.getWorkspaceState("test-workspace"); // Wait for async processing - await new Promise((resolve) => setTimeout(resolve, 20)); + await new Promise((resolve) => setTimeout(resolve, 70)); const state2 = store.getWorkspaceState("test-workspace"); expect(state1).not.toBe(state2); // Cache should be invalidated @@ -541,6 +1487,7 @@ describe("WorkspaceStore", () => { }); }); + store.setActiveWorkspaceId(metadata.id); store.addWorkspace(metadata); const states1 = store.getAllStates(); @@ -862,6 +1809,79 @@ describe("WorkspaceStore", () => { expect(store.getTaskToolLiveTaskId(workspaceId, "call-task-2")).toBeNull(); }); + it("preserves pagination state across since reconnect retries", async () => { + const workspaceId = "pagination-since-retry"; + let subscriptionCount = 0; + let releaseFirstSubscription: (() => void) | undefined; + const holdFirstSubscription = new Promise((resolve) => { + releaseFirstSubscription = resolve; + }); + + const waitUntil = async (condition: () => boolean, timeoutMs = 2000): Promise => { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (condition()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + return false; + }; + + mockOnChat.mockImplementation(async function* (): AsyncGenerator< + WorkspaceChatMessage, + void, + unknown + > { + subscriptionCount += 1; + + if (subscriptionCount === 1) { + yield createHistoryMessageEvent("history-5", 5); + yield { + type: "caught-up", + replay: "full", + hasOlderHistory: true, + cursor: { + history: { + messageId: "history-5", + historySequence: 5, + }, + }, + }; + + await holdFirstSubscription; + return; + } + + yield { + type: "caught-up", + replay: "since", + cursor: { + history: { + messageId: "history-5", + historySequence: 5, + }, + }, + }; + }); + + createAndAddWorkspace(store, workspaceId); + + const seededPagination = await waitUntil( + () => store.getWorkspaceState(workspaceId).hasOlderHistory === true + ); + expect(seededPagination).toBe(true); + + releaseFirstSubscription?.(); + + const preservedPagination = await waitUntil(() => { + return ( + subscriptionCount >= 2 && store.getWorkspaceState(workspaceId).hasOlderHistory === true + ); + }); + expect(preservedPagination).toBe(true); + }); + it("clears stale live tool state when since replay reports no active stream", async () => { const workspaceId = "task-created-workspace-4"; let subscriptionCount = 0; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index b7b8356845..f3ac11eac9 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1,7 +1,12 @@ import assert from "@/common/utils/assert"; import type { MuxMessage, DisplayedMessage, QueuedMessage } from "@/common/types/message"; import type { FrontendWorkspaceMetadata } from "@/common/types/workspace"; -import type { WorkspaceChatMessage, WorkspaceStatsSnapshot, OnChatMode } from "@/common/orpc/types"; +import type { + WorkspaceActivitySnapshot, + WorkspaceChatMessage, + WorkspaceStatsSnapshot, + OnChatMode, +} from "@/common/orpc/types"; import type { RouterClient } from "@orpc/server"; import type { AppRouter } from "@/node/orpc/router"; import type { TodoItem } from "@/common/types/tools"; @@ -61,6 +66,8 @@ export interface WorkspaceState { isStreamStarting: boolean; awaitingUserQuestion: boolean; loading: boolean; + hasOlderHistory: boolean; + loadingOlderHistory: boolean; muxMessages: MuxMessage[]; currentModel: string | null; currentThinkingLevel: string | null; @@ -228,6 +235,43 @@ interface WorkspaceChatTransientState { liveTaskIds: Map; } +interface HistoryPaginationCursor { + beforeHistorySequence: number; + beforeMessageId?: string | null; +} + +interface WorkspaceHistoryPaginationState { + nextCursor: HistoryPaginationCursor | null; + hasOlder: boolean; + loading: boolean; +} + +function areHistoryPaginationCursorsEqual( + a: HistoryPaginationCursor | null, + b: HistoryPaginationCursor | null +): boolean { + if (a === b) { + return true; + } + + if (!a || !b) { + return false; + } + + return ( + a.beforeHistorySequence === b.beforeHistorySequence && + (a.beforeMessageId ?? null) === (b.beforeMessageId ?? null) + ); +} + +function createInitialHistoryPaginationState(): WorkspaceHistoryPaginationState { + return { + nextCursor: null, + hasOlder: false, + loading: false, + }; +} + function createInitialChatTransientState(): WorkspaceChatTransientState { return { caughtUp: false, @@ -352,11 +396,29 @@ export class WorkspaceStore { // Supporting data structures private aggregators = new Map(); + // Active onChat subscription cleanup handlers (must stay size <= 1). private ipcUnsubscribers = new Map void>(); + // Workspace selected in the UI (set from WorkspaceContext routing state). + private activeWorkspaceId: string | null = null; + + // Workspace currently owning the live onChat subscription. + private activeOnChatWorkspaceId: string | null = null; + + // Lightweight activity snapshots from workspace.activity.list/subscribe. + private workspaceActivity = new Map(); + // Recency timestamp observed when a workspace transitions into streaming=true. + // Used to distinguish true stream completion (recency bumps on stream-end) from + // abort/error transitions (streaming=false without recency advance). + private activityStreamingStartRecency = new Map(); + private activityAbortController: AbortController | null = null; + // Per-workspace ephemeral chat state (buffering, queued message, live bash output, etc.) private chatTransientState = new Map(); + // Per-workspace transcript pagination state for loading prior compaction epochs. + private historyPagination = new Map(); + private workspaceMetadata = new Map(); // Store metadata for name lookup // Workspace timing stats snapshots (from workspace.stats.subscribe) @@ -705,10 +767,14 @@ export class WorkspaceStore { this.clientChangeController.abort(); this.clientChangeController = new AbortController(); - for (const workspaceId of this.ipcUnsubscribers.keys()) { + for (const workspaceId of this.workspaceMetadata.keys()) { this.pendingReplayReset.add(workspaceId); } + if (client) { + this.ensureActivitySubscription(); + } + if (!client) { return; } @@ -719,6 +785,123 @@ export class WorkspaceStore { this.subscribeToStats(workspaceId); } } + + this.ensureActiveOnChatSubscription(); + } + + setActiveWorkspaceId(workspaceId: string | null): void { + assert( + workspaceId === null || (typeof workspaceId === "string" && workspaceId.length > 0), + "setActiveWorkspaceId requires a non-empty workspaceId or null" + ); + + if (this.activeWorkspaceId === workspaceId) { + return; + } + + const previousActiveId = this.activeWorkspaceId; + this.activeWorkspaceId = workspaceId; + this.ensureActiveOnChatSubscription(); + + // Invalidate cached workspace state for both the old and new active + // workspaces. getWorkspaceState() uses activeOnChatWorkspaceId to decide + // whether to trust aggregator data or activity snapshots, so a switch + // requires recomputation even if no new events arrived. + if (previousActiveId) { + this.states.bump(previousActiveId); + } + if (workspaceId) { + this.states.bump(workspaceId); + } + } + + isOnChatSubscriptionActive(workspaceId: string): boolean { + assert( + typeof workspaceId === "string" && workspaceId.length > 0, + "isOnChatSubscriptionActive requires a non-empty workspaceId" + ); + + return this.activeOnChatWorkspaceId === workspaceId; + } + + private ensureActivitySubscription(): void { + if (this.activityAbortController) { + return; + } + + const controller = new AbortController(); + this.activityAbortController = controller; + void this.runActivitySubscription(controller.signal); + } + + private assertSingleActiveOnChatSubscription(): void { + assert( + this.ipcUnsubscribers.size <= 1, + `[WorkspaceStore] Expected at most one active onChat subscription, found ${this.ipcUnsubscribers.size}` + ); + + if (this.activeOnChatWorkspaceId === null) { + assert( + this.ipcUnsubscribers.size === 0, + "[WorkspaceStore] onChat unsubscribe map must be empty when no active workspace is subscribed" + ); + return; + } + + assert( + this.ipcUnsubscribers.has(this.activeOnChatWorkspaceId), + `[WorkspaceStore] Missing onChat unsubscribe handler for ${this.activeOnChatWorkspaceId}` + ); + } + + private clearReplayBuffers(workspaceId: string): void { + const transient = this.chatTransientState.get(workspaceId); + if (!transient) { + return; + } + + // Replay buffers are only valid for the in-flight subscription attempt that + // populated them. Clear eagerly when deactivating/retrying so stale buffered + // events cannot leak into a later caught-up cycle. + transient.caughtUp = false; + transient.replayingHistory = false; + transient.historicalMessages.length = 0; + transient.pendingStreamEvents.length = 0; + } + + private ensureActiveOnChatSubscription(): void { + const targetWorkspaceId = + this.activeWorkspaceId && this.isWorkspaceRegistered(this.activeWorkspaceId) + ? this.activeWorkspaceId + : null; + + if (this.activeOnChatWorkspaceId === targetWorkspaceId) { + this.assertSingleActiveOnChatSubscription(); + return; + } + + if (this.activeOnChatWorkspaceId) { + const previousActiveWorkspaceId = this.activeOnChatWorkspaceId; + // Clear replay buffers before aborting so a fast workspace switch/reopen + // cannot replay stale buffered rows from the previous subscription attempt. + this.clearReplayBuffers(previousActiveWorkspaceId); + + const unsubscribe = this.ipcUnsubscribers.get(previousActiveWorkspaceId); + if (unsubscribe) { + unsubscribe(); + } + this.ipcUnsubscribers.delete(previousActiveWorkspaceId); + this.activeOnChatWorkspaceId = null; + } + + if (targetWorkspaceId) { + const controller = new AbortController(); + this.ipcUnsubscribers.set(targetWorkspaceId, () => controller.abort()); + this.activeOnChatWorkspaceId = targetWorkspaceId; + void this.runOnChatSubscription(targetWorkspaceId, controller.signal); + } + + this.assertSingleActiveOnChatSubscription(); } /** @@ -842,8 +1025,8 @@ export class WorkspaceStore { return; } - // Only subscribe when we have at least one UI consumer. - if (!this.ipcUnsubscribers.has(workspaceId)) { + // Only subscribe for registered workspaces when we have at least one UI consumer. + if (!this.isWorkspaceRegistered(workspaceId)) { return; } if ((this.statsListenerCounts.get(workspaceId) ?? 0) <= 0) { @@ -1020,6 +1203,50 @@ export class WorkspaceStore { return state; } + private deriveHistoryPaginationState( + aggregator: StreamingMessageAggregator, + hasOlderOverride?: boolean + ): WorkspaceHistoryPaginationState { + for (const message of aggregator.getAllMessages()) { + const historySequence = message.metadata?.historySequence; + if ( + typeof historySequence !== "number" || + !Number.isInteger(historySequence) || + historySequence < 0 + ) { + continue; + } + + // The server's caught-up payload is authoritative for full replays because + // display-only messages can skip early historySequence rows. When legacy + // payloads omit hasOlderHistory, only infer older pages when the oldest + // loaded message is a durable compaction boundary marker (a concrete signal + // that this replay started mid-history), not merely historySequence > 0. + const hasOlder = + hasOlderOverride ?? (historySequence > 0 && isDurableCompactionBoundaryMarker(message)); + return { + nextCursor: hasOlder + ? { + beforeHistorySequence: historySequence, + beforeMessageId: message.id, + } + : null, + hasOlder, + loading: false, + }; + } + + if (hasOlderOverride !== undefined) { + return { + nextCursor: null, + hasOlder: hasOlderOverride, + loading: false, + }; + } + + return createInitialHistoryPaginationState(); + } + /** * Get state for a specific workspace. * Lazy computation - only runs when version changes. @@ -1032,11 +1259,39 @@ export class WorkspaceStore { const hasMessages = aggregator.hasMessages(); const transient = this.assertChatTransientState(workspaceId); + const historyPagination = + this.historyPagination.get(workspaceId) ?? createInitialHistoryPaginationState(); const activeStreams = aggregator.getActiveStreams(); + const activity = this.workspaceActivity.get(workspaceId); + const isActiveWorkspace = this.activeOnChatWorkspaceId === workspaceId; const messages = aggregator.getAllMessages(); const metadata = this.workspaceMetadata.get(workspaceId); const pendingStreamStartTime = aggregator.getPendingStreamStartTime(); - const canInterrupt = activeStreams.length > 0; + // Trust the live aggregator only when it is both active AND has finished + // replaying historical events (caughtUp). During the replay window after a + // workspace switch, the aggregator is cleared and re-hydrating; fall back to + // the activity snapshot so the UI continues to reflect the last known state + // (e.g., canInterrupt stays true for a workspace that is still streaming). + // + // For non-active workspaces, the aggregator's activeStreams may be stale since + // they don't receive stream-end events when unsubscribed from onChat. Prefer the + // activity snapshot's streaming state, which is updated via the lightweight activity + // subscription for all workspaces. + const useAggregatorState = isActiveWorkspace && transient.caughtUp; + const canInterrupt = useAggregatorState + ? activeStreams.length > 0 + : (activity?.streaming ?? activeStreams.length > 0); + const currentModel = useAggregatorState + ? (aggregator.getCurrentModel() ?? null) + : (activity?.lastModel ?? aggregator.getCurrentModel() ?? null); + const currentThinkingLevel = useAggregatorState + ? (aggregator.getCurrentThinkingLevel() ?? null) + : (activity?.lastThinkingLevel ?? aggregator.getCurrentThinkingLevel() ?? null); + const aggregatorRecency = aggregator.getRecencyTimestamp(); + const recencyTimestamp = + aggregatorRecency === null + ? (activity?.recency ?? null) + : Math.max(aggregatorRecency, activity?.recency ?? aggregatorRecency); const isStreamStarting = pendingStreamStartTime !== null && !canInterrupt; // Live streaming stats @@ -1057,10 +1312,12 @@ export class WorkspaceStore { isStreamStarting, awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(), loading: !hasMessages && !transient.caughtUp, + hasOlderHistory: historyPagination.hasOlder, + loadingOlderHistory: historyPagination.loading, muxMessages: messages, - currentModel: aggregator.getCurrentModel() ?? null, - currentThinkingLevel: aggregator.getCurrentThinkingLevel() ?? null, - recencyTimestamp: aggregator.getRecencyTimestamp(), + currentModel, + currentThinkingLevel, + recencyTimestamp, todos: aggregator.getCurrentTodos(), loadedSkills: aggregator.getLoadedSkills(), skillLoadErrors: aggregator.getSkillLoadErrors(), @@ -1214,6 +1471,113 @@ export class WorkspaceStore { this.states.bump(workspaceId); } + async loadOlderHistory(workspaceId: string): Promise { + assert( + typeof workspaceId === "string" && workspaceId.length > 0, + "loadOlderHistory requires a non-empty workspaceId" + ); + + const client = this.client; + if (!client) { + console.warn(`[WorkspaceStore] Cannot load older history for ${workspaceId}: no ORPC client`); + return; + } + + const paginationState = this.historyPagination.get(workspaceId); + if (!paginationState) { + console.warn( + `[WorkspaceStore] Cannot load older history for ${workspaceId}: pagination state is not initialized` + ); + return; + } + + if (!paginationState.hasOlder || paginationState.loading) { + return; + } + + if (!this.aggregators.has(workspaceId)) { + console.warn( + `[WorkspaceStore] Cannot load older history for ${workspaceId}: workspace is not registered` + ); + return; + } + + const requestedCursor = paginationState.nextCursor + ? { + beforeHistorySequence: paginationState.nextCursor.beforeHistorySequence, + beforeMessageId: paginationState.nextCursor.beforeMessageId, + } + : null; + + this.historyPagination.set(workspaceId, { + nextCursor: requestedCursor, + hasOlder: paginationState.hasOlder, + loading: true, + }); + this.states.bump(workspaceId); + + try { + const result = await client.workspace.history.loadMore({ + workspaceId, + cursor: requestedCursor, + }); + + const aggregator = this.aggregators.get(workspaceId); + const latestPagination = this.historyPagination.get(workspaceId); + if ( + !aggregator || + !latestPagination || + !latestPagination.loading || + !areHistoryPaginationCursorsEqual(latestPagination.nextCursor, requestedCursor) + ) { + return; + } + + if (result.hasOlder) { + assert( + result.nextCursor, + `[WorkspaceStore] loadMore for ${workspaceId} returned hasOlder=true without nextCursor` + ); + } + + const historicalMessages = result.messages.filter(isMuxMessage); + const ignoredCount = result.messages.length - historicalMessages.length; + if (ignoredCount > 0) { + console.warn( + `[WorkspaceStore] Ignoring ${ignoredCount} non-message history rows for ${workspaceId}` + ); + } + + if (historicalMessages.length > 0) { + aggregator.loadHistoricalMessages(historicalMessages, false, { + mode: "append", + skipDerivedState: true, + }); + this.consumerManager.scheduleCalculation(workspaceId, aggregator); + } + + this.historyPagination.set(workspaceId, { + nextCursor: result.nextCursor, + hasOlder: result.hasOlder, + loading: false, + }); + } catch (error) { + console.error(`[WorkspaceStore] Failed to load older history for ${workspaceId}:`, error); + + const latestPagination = this.historyPagination.get(workspaceId); + if (latestPagination) { + this.historyPagination.set(workspaceId, { + ...latestPagination, + loading: false, + }); + } + } finally { + if (this.isWorkspaceRegistered(workspaceId)) { + this.states.bump(workspaceId); + } + } + } + /** * Mark the current active stream as "interrupting" (transient state). * Call this before invoking interruptStream so the UI shows "interrupting..." @@ -1558,8 +1922,228 @@ export class WorkspaceStore { }); } - private isWorkspaceSubscribed(workspaceId: string): boolean { - return this.ipcUnsubscribers.has(workspaceId); + private isWorkspaceRegistered(workspaceId: string): boolean { + return this.workspaceMetadata.has(workspaceId); + } + + private getBackgroundCompletionCompaction( + workspaceId: string + ): { hasContinueMessage: boolean } | undefined { + const aggregator = this.aggregators.get(workspaceId); + if (!aggregator) { + return undefined; + } + + const compactingStreams = aggregator + .getActiveStreams() + .filter((stream) => stream.isCompacting === true); + + if (compactingStreams.length === 0) { + return undefined; + } + + return { + hasContinueMessage: compactingStreams.some((stream) => stream.hasCompactionContinue === true), + }; + } + + private applyWorkspaceActivitySnapshot( + workspaceId: string, + snapshot: WorkspaceActivitySnapshot | null + ): void { + const previous = this.workspaceActivity.get(workspaceId) ?? null; + + if (snapshot) { + this.workspaceActivity.set(workspaceId, snapshot); + } else { + this.workspaceActivity.delete(workspaceId); + } + + const changed = + previous?.streaming !== snapshot?.streaming || + previous?.lastModel !== snapshot?.lastModel || + previous?.lastThinkingLevel !== snapshot?.lastThinkingLevel || + previous?.recency !== snapshot?.recency; + + if (!changed) { + return; + } + + if (this.aggregators.has(workspaceId)) { + this.states.bump(workspaceId); + } + + const startedStreamingSnapshot = + previous?.streaming !== true && snapshot?.streaming === true ? snapshot : null; + if (startedStreamingSnapshot) { + this.activityStreamingStartRecency.set(workspaceId, startedStreamingSnapshot.recency); + } + + const stoppedStreamingSnapshot = + previous?.streaming === true && snapshot?.streaming === false ? snapshot : null; + const isBackgroundStreamingStop = + stoppedStreamingSnapshot !== null && workspaceId !== this.activeWorkspaceId; + const streamStartRecency = this.activityStreamingStartRecency.get(workspaceId); + const recencyAdvancedSinceStreamStart = + stoppedStreamingSnapshot !== null && + streamStartRecency !== undefined && + stoppedStreamingSnapshot.recency > streamStartRecency; + const backgroundCompaction = isBackgroundStreamingStop + ? this.getBackgroundCompletionCompaction(workspaceId) + : undefined; + + // Trigger response completion notifications for background workspaces only when + // activity indicates a true completion (streaming true -> false WITH recency advance). + // stream-abort/error transitions also flip streaming to false, but recency stays + // unchanged there, so suppress completion notifications in those cases. + if (stoppedStreamingSnapshot && recencyAdvancedSinceStreamStart && isBackgroundStreamingStop) { + if (this.responseCompleteCallback) { + // Activity snapshots don't include message/content metadata. Reuse any + // still-active stream context captured before this workspace was backgrounded + // so compaction continue turns remain suppressible in App notifications. + this.responseCompleteCallback( + workspaceId, + "", + true, + "", + backgroundCompaction, + stoppedStreamingSnapshot.recency + ); + } + } + + if (isBackgroundStreamingStop) { + // Inactive workspaces do not receive stream-end events via onChat. Once + // activity confirms streaming stopped, clear stale stream contexts so they + // cannot leak compaction metadata into future completion callbacks. + this.aggregators.get(workspaceId)?.clearActiveStreams(); + } + + if (snapshot?.streaming !== true) { + this.activityStreamingStartRecency.delete(workspaceId); + } + + if (previous?.recency !== snapshot?.recency && this.aggregators.has(workspaceId)) { + this.derived.bump("recency"); + } + } + + private applyWorkspaceActivityList(snapshots: Record): void { + const snapshotEntries = Object.entries(snapshots); + + // Defensive fallback: workspace.activity.list returns {} on backend read failures. + // Preserve last-known snapshots instead of wiping sidebar activity state for all + // workspaces during a transient metadata read error. + if (snapshotEntries.length === 0) { + return; + } + + const seenWorkspaceIds = new Set(); + + for (const [workspaceId, snapshot] of snapshotEntries) { + seenWorkspaceIds.add(workspaceId); + this.applyWorkspaceActivitySnapshot(workspaceId, snapshot); + } + + for (const workspaceId of Array.from(this.workspaceActivity.keys())) { + if (seenWorkspaceIds.has(workspaceId)) { + continue; + } + this.applyWorkspaceActivitySnapshot(workspaceId, null); + } + } + + private async runActivitySubscription(signal: AbortSignal): Promise { + let attempt = 0; + + while (!signal.aborted) { + const client = this.client ?? (await this.waitForClient(signal)); + if (!client || signal.aborted) { + return; + } + + const attemptController = new AbortController(); + const onAbort = () => attemptController.abort(); + signal.addEventListener("abort", onAbort); + + const clientChangeSignal = this.clientChangeController.signal; + const onClientChange = () => attemptController.abort(); + clientChangeSignal.addEventListener("abort", onClientChange, { once: true }); + + try { + // Open the live delta stream first so no state transition can be lost + // between the list snapshot fetch and subscribe registration. + const iterator = await client.workspace.activity.subscribe(undefined, { + signal: attemptController.signal, + }); + + const snapshots = await client.workspace.activity.list(); + if (signal.aborted) { + return; + } + // Client changed while list() was in flight — retry with the new client + // instead of exiting permanently. The outer while loop will pick up the + // replacement client on the next iteration. + if (attemptController.signal.aborted) { + continue; + } + + queueMicrotask(() => { + if (signal.aborted || attemptController.signal.aborted) { + return; + } + this.applyWorkspaceActivityList(snapshots); + }); + + for await (const event of iterator) { + if (signal.aborted) { + return; + } + + // Connection is alive again - don't carry old backoff into the next failure. + attempt = 0; + + queueMicrotask(() => { + if (signal.aborted || attemptController.signal.aborted) { + return; + } + this.applyWorkspaceActivitySnapshot(event.workspaceId, event.activity); + }); + } + + if (signal.aborted) { + return; + } + + if (!attemptController.signal.aborted) { + console.warn("[WorkspaceStore] activity subscription ended unexpectedly; retrying..."); + } + } catch (error) { + if (signal.aborted) { + return; + } + + const abortError = isAbortError(error); + if (attemptController.signal.aborted) { + if (!abortError) { + console.warn("[WorkspaceStore] activity subscription aborted; retrying..."); + } + } else if (!abortError) { + console.warn("[WorkspaceStore] Error in activity subscription:", error); + } + } finally { + signal.removeEventListener("abort", onAbort); + clientChangeSignal.removeEventListener("abort", onClientChange); + } + + const delayMs = calculateOnChatBackoffMs(attempt); + attempt++; + + await this.sleepWithAbort(delayMs, signal); + if (signal.aborted) { + return; + } + } } private async waitForClient(signal: AbortSignal): Promise | null> { @@ -1621,6 +2205,8 @@ export class WorkspaceStore { // Reset per-workspace transient state so the next replay rebuilds from the backend source of truth. this.chatTransientState.set(workspaceId, createInitialChatTransientState()); + this.historyPagination.set(workspaceId, createInitialHistoryPaginationState()); + this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); } @@ -1651,6 +2237,13 @@ export class WorkspaceStore { let lastChatEventAt = Date.now(); try { + // Always reset caughtUp at subscription start so historical events are + // buffered until the caught-up marker arrives, regardless of replay mode. + const transient = this.chatTransientState.get(workspaceId); + if (transient) { + transient.caughtUp = false; + } + // Reconnect incrementally whenever we can build a valid cursor. // Do not gate on transient.caughtUp here: retry paths may optimistically // set caughtUp=false to re-enable buffering, but the cursor can still @@ -1707,7 +2300,14 @@ export class WorkspaceStore { // Connection is alive again - don't carry old backoff into the next failure. attempt = 0; + const attemptSignal = attemptController.signal; queueMicrotask(() => { + // Workspace switches abort the previous attempt before starting a new one. + // Drop any already-queued chat events from that aborted attempt so stale + // replay buffers cannot be repopulated after we synchronously cleared them. + if (signal.aborted || attemptSignal.aborted) { + return; + } this.handleChatMessage(workspaceId, data); }); } @@ -1748,7 +2348,7 @@ export class WorkspaceStore { // 3. Connection dropped (WebSocket/MessagePort error) // Only suppress if workspace no longer exists (was removed during the race) - if (!this.isWorkspaceSubscribed(workspaceId)) { + if (!this.isWorkspaceRegistered(workspaceId)) { return; } // Log with detailed validation info for debugging schema mismatches @@ -1766,17 +2366,21 @@ export class WorkspaceStore { } } - if (this.isWorkspaceSubscribed(workspaceId)) { - const transient = this.chatTransientState.get(workspaceId); - if (transient) { - // Failed reconnect attempts may have buffered partial replay data. - // Clear replay buffers before the next attempt so we don't append a - // second replay copy and duplicate deltas/tool events on caught-up. - transient.caughtUp = false; - transient.replayingHistory = false; - transient.historicalMessages.length = 0; - transient.pendingStreamEvents.length = 0; - } + if (this.isWorkspaceRegistered(workspaceId)) { + // Failed reconnect attempts may have buffered partial replay data. + // Clear replay buffers before the next attempt so we don't append a + // second replay copy and duplicate deltas/tool events on caught-up. + this.clearReplayBuffers(workspaceId); + + // Preserve pagination across transient reconnect retries. Incremental + // caught-up payloads intentionally omit hasOlderHistory, so resetting + // here would permanently hide "Load older messages" until a full replay. + const existingPagination = + this.historyPagination.get(workspaceId) ?? createInitialHistoryPaginationState(); + this.historyPagination.set(workspaceId, { + ...existingPagination, + loading: false, + }); } const delayMs = calculateOnChatBackoffMs(attempt); @@ -1790,7 +2394,7 @@ export class WorkspaceStore { } /** - * Add a workspace and subscribe to its IPC events. + * Register a workspace and initialize local state. */ /** @@ -1804,8 +2408,8 @@ export class WorkspaceStore { addWorkspace(metadata: FrontendWorkspaceMetadata): void { const workspaceId = metadata.id; - // Skip if already subscribed - if (this.ipcUnsubscribers.has(workspaceId)) { + // Skip if already registered + if (this.workspaceMetadata.has(workspaceId)) { return; } @@ -1837,19 +2441,13 @@ export class WorkspaceStore { this.chatTransientState.set(workspaceId, createInitialChatTransientState()); } + if (!this.historyPagination.has(workspaceId)) { + this.historyPagination.set(workspaceId, createInitialHistoryPaginationState()); + } + // Clear stale streaming state aggregator.clearActiveStreams(); - // Subscribe to IPC events - // Wrap in queueMicrotask to ensure IPC events don't update during React render - const controller = new AbortController(); - const { signal } = controller; - - this.ipcUnsubscribers.set(workspaceId, () => controller.abort()); - - // Fire and forget the subscription loop (retries on errors) - void this.runOnChatSubscription(workspaceId, signal); - // Fetch persisted session usage (fire-and-forget) this.client?.workspace .getSessionUsage({ workspaceId }) @@ -1868,6 +2466,8 @@ export class WorkspaceStore { this.subscribeToStats(workspaceId); } + this.ensureActiveOnChatSubscription(); + if (!this.client) { console.warn(`[WorkspaceStore] No ORPC client available for workspace ${workspaceId}`); } @@ -1883,17 +2483,24 @@ export class WorkspaceStore { // Clean up idle callback to prevent stale callbacks this.cancelPendingIdleBump(workspaceId); + if (this.activeWorkspaceId === workspaceId) { + this.activeWorkspaceId = null; + } + const statsUnsubscribe = this.statsUnsubscribers.get(workspaceId); if (statsUnsubscribe) { statsUnsubscribe(); this.statsUnsubscribers.delete(workspaceId); } - // Unsubscribe from IPC + const unsubscribe = this.ipcUnsubscribers.get(workspaceId); if (unsubscribe) { unsubscribe(); this.ipcUnsubscribers.delete(workspaceId); } + if (this.activeOnChatWorkspaceId === workspaceId) { + this.activeOnChatWorkspaceId = null; + } this.pendingReplayReset.delete(workspaceId); @@ -1903,6 +2510,9 @@ export class WorkspaceStore { this.consumersStore.delete(workspaceId); this.aggregators.delete(workspaceId); this.chatTransientState.delete(workspaceId); + this.workspaceMetadata.delete(workspaceId); + this.workspaceActivity.delete(workspaceId); + this.activityStreamingStartRecency.delete(workspaceId); this.recencyCache.delete(workspaceId); this.previousSidebarValues.delete(workspaceId); this.sidebarStateCache.delete(workspaceId); @@ -1910,7 +2520,12 @@ export class WorkspaceStore { this.workspaceCreatedAt.delete(workspaceId); this.workspaceStats.delete(workspaceId); this.statsStore.delete(workspaceId); + this.statsListenerCounts.delete(workspaceId); + this.historyPagination.delete(workspaceId); this.sessionUsage.delete(workspaceId); + + this.ensureActiveOnChatSubscription(); + this.derived.bump("recency"); } /** @@ -1918,7 +2533,7 @@ export class WorkspaceStore { */ syncWorkspaces(workspaceMetadata: Map): void { const metadataIds = new Set(Array.from(workspaceMetadata.values()).map((m) => m.id)); - const currentIds = new Set(this.ipcUnsubscribers.keys()); + const currentIds = new Set(this.workspaceMetadata.keys()); // Add new workspaces for (const metadata of workspaceMetadata.values()) { @@ -1933,6 +2548,14 @@ export class WorkspaceStore { this.removeWorkspace(workspaceId); } } + + // Re-evaluate the active subscription after additions/removals. + // removeWorkspace can null activeWorkspaceId when the removed workspace + // was active (e.g., stale singleton state between integration tests), + // leaving addWorkspace's ensureActiveOnChatSubscription targeting the + // old workspace. This final call reconciles the subscription with the + // current activeWorkspaceId + registration state. + this.ensureActiveOnChatSubscription(); } /** @@ -1946,10 +2569,19 @@ export class WorkspaceStore { unsubscribe(); } this.statsUnsubscribers.clear(); + for (const unsubscribe of this.ipcUnsubscribers.values()) { unsubscribe(); } this.ipcUnsubscribers.clear(); + + if (this.activityAbortController) { + this.activityAbortController.abort(); + this.activityAbortController = null; + } + + this.activeWorkspaceId = null; + this.activeOnChatWorkspaceId = null; this.pendingReplayReset.clear(); this.states.clear(); this.derived.clear(); @@ -1957,9 +2589,13 @@ export class WorkspaceStore { this.consumersStore.clear(); this.aggregators.clear(); this.chatTransientState.clear(); + this.workspaceMetadata.clear(); + this.workspaceActivity.clear(); + this.activityStreamingStartRecency.clear(); this.workspaceStats.clear(); this.statsStore.clear(); this.statsListenerCounts.clear(); + this.historyPagination.clear(); this.sessionUsage.clear(); this.recencyCache.clear(); this.previousSidebarValues.clear(); @@ -2091,7 +2727,7 @@ export class WorkspaceStore { } private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void { - // Aggregator must exist - IPC subscription happens in addWorkspace() + // Aggregator must exist - workspaces are initialized in addWorkspace() before subscriptions run. const aggregator = this.assertGet(workspaceId); const transient = this.assertChatTransientState(workspaceId); @@ -2111,6 +2747,13 @@ export class WorkspaceStore { serverActiveStreamMessageId !== undefined && serverActiveStreamMessageId !== localActiveStreamMessageId; + // Track the server's replay window start for accurate reconnect cursors. + // This prevents loadOlderHistory-prepended pages from polluting the cursor. + const serverOldestSeq = data.cursor?.history?.oldestHistorySequence; + if (typeof serverOldestSeq === "number") { + aggregator.setEstablishedOldestHistorySequence(serverOldestSeq); + } + // Defensive cleanup: // - full replay means backend rebuilt state from scratch, so stale local stream contexts // must be cleared even if a stream cursor is present in caught-up metadata. @@ -2168,6 +2811,18 @@ export class WorkspaceStore { // Done replaying buffered events transient.replayingHistory = false; + if (replay === "since" && data.hasOlderHistory === undefined) { + // Since reconnects keep the pre-disconnect pagination state. The server + // omits hasOlderHistory for this mode because the client already knows it. + if (!this.historyPagination.has(workspaceId)) { + this.historyPagination.set(workspaceId, createInitialHistoryPaginationState()); + } + } else { + this.historyPagination.set( + workspaceId, + this.deriveHistoryPaginationState(aggregator, data.hasOlderHistory) + ); + } // Mark as caught up transient.caughtUp = true; this.states.bump(workspaceId); @@ -2321,6 +2976,18 @@ export class WorkspaceStore { } else { // Process live events immediately (after history loaded) applyWorkspaceChatEventToAggregator(aggregator, data); + + const muxMeta = data.metadata?.muxMetadata as { type?: string } | undefined; + const isCompactionBoundarySummary = + data.role === "assistant" && + (data.metadata?.compactionBoundary === true || muxMeta?.type === "compaction-summary"); + + if (isCompactionBoundarySummary) { + // Live compaction prunes older messages inside the aggregator; refresh the + // pagination cursor so "Load more" starts from the new oldest visible sequence. + this.historyPagination.set(workspaceId, this.deriveHistoryPaginationState(aggregator)); + } + this.states.bump(workspaceId); this.usageStore.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); @@ -2384,6 +3051,18 @@ export const workspaceStore = { */ getWorkspaceSidebarState: (workspaceId: string) => getStoreInstance().getWorkspaceSidebarState(workspaceId), + /** + * Register a workspace in the store (idempotent). + * Exposed for test helpers that need to ensure workspace registration + * before setting it as active. + */ + addWorkspace: (metadata: FrontendWorkspaceMetadata) => getStoreInstance().addWorkspace(metadata), + /** + * Set the active workspace for onChat subscription management. + * Exposed for test helpers that bypass React routing effects. + */ + setActiveWorkspaceId: (workspaceId: string | null) => + getStoreInstance().setActiveWorkspaceId(workspaceId), }; /** diff --git a/src/browser/stories/App.chat.stories.tsx b/src/browser/stories/App.chat.stories.tsx index b30a872dbe..d036685a14 100644 --- a/src/browser/stories/App.chat.stories.tsx +++ b/src/browser/stories/App.chat.stories.tsx @@ -30,6 +30,16 @@ import { setupSimpleChatStory, setupStreamingChatStory, setWorkspaceInput } from import { within, userEvent, waitFor } from "@storybook/test"; import { warmHashCache, setShareData } from "@/browser/utils/sharedUrlCache"; +import { MODEL_ABBREVIATION_EXAMPLES } from "@/common/constants/knownModels"; +import { formatKeybind, KEYBINDS } from "@/browser/utils/ui/keybinds"; +import { + HelpIndicator, + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/browser/components/ui/tooltip"; + export default { ...appMeta, title: "App/Chat", @@ -890,45 +900,61 @@ export const BackgroundProcesses: AppStory = { */ export const ModeHelpTooltip: AppStory = { render: () => ( - - setupSimpleChatStory({ - messages: [], - }) - } - /> + +
+ + + ? + + + Click to edit +
+ {formatKeybind(KEYBINDS.CYCLE_MODEL)} to cycle models +
+
+ Abbreviations: + {MODEL_ABBREVIATION_EXAMPLES.map((ex) => ( + +
/model {ex.abbrev} - {ex.displayName} +
+ ))} +
+
+ Full format: +
+ /model provider:model-name +
+ (e.g., /model anthropic:claude-sonnet-4-5) +
+
+
+
), play: async ({ canvasElement }) => { - const storyRoot = document.getElementById("storybook-root") ?? canvasElement; - const canvas = within(storyRoot); - - // Wait for app to fully load - the chat input with mode selector should be present - await canvas.findAllByText("Exec", {}, { timeout: 10000 }); - - // Find the help indicator "?" - should be a span with cursor-help styling - const helpIndicators = canvas.getAllByText("?"); - const helpIndicator = helpIndicators.find( - (el) => el.tagName === "SPAN" && el.className.includes("cursor-help") - ); - if (!helpIndicator) throw new Error("HelpIndicator not found"); + const canvas = within(canvasElement); + const helpIndicator = await canvas.findByTestId("mode-help-indicator"); - // Hover to open the tooltip and leave it visible for the visual snapshot await userEvent.hover(helpIndicator); - // Wait for tooltip to fully appear (Radix has 200ms delay) await waitFor( () => { const tooltip = document.querySelector('[role="tooltip"]'); - if (!tooltip) throw new Error("Tooltip not visible"); + if (!(tooltip instanceof HTMLElement)) { + throw new Error("Tooltip not visible"); + } + if (!tooltip.textContent?.includes("Click to edit")) { + throw new Error("Expected model help tooltip content to be visible"); + } }, - { interval: 50 } + { interval: 50, timeout: 5000 } ); }, + parameters: { docs: { description: { story: - "Verifies the HelpIndicator tooltip works by focusing the ? icon. The tooltip should appear with Exec/Plan mode explanations.", + "Verifies the model help tooltip trigger works and renders the shortcut/abbreviation guidance content.", }, }, }, @@ -973,12 +999,17 @@ export const ModelSelectorPrettyWithGateway: AppStory = { const canvas = within(canvasElement); // Wait for chat input to mount. - await canvas.findAllByText("Exec", {}, { timeout: 10000 }); + await canvas.findAllByText("Exec", {}, { timeout: 15000 }); // With gateway enabled, we should still display the *pretty* model name. - await waitFor(() => { - canvas.getByText("GPT-4o"); - }); + // CI can take longer than the default waitFor timeout while workspace/model + // state hydrates, so wait explicitly instead of triggering a flaky retry. + await waitFor( + () => { + canvas.getByText("GPT-4o"); + }, + { interval: 50, timeout: 10000 } + ); // The buggy rendering (mux-gateway:openai/gpt-4o) shows up as "Openai/gpt 4o". const ugly = canvas.queryByText("Openai/gpt 4o"); @@ -993,7 +1024,7 @@ export const ModelSelectorPrettyWithGateway: AppStory = { if (!el) throw new Error("Gateway indicator not found"); return el; }, - { interval: 50 } + { interval: 50, timeout: 15000 } ); // Hover to prove the gateway tooltip is wired up (and keep it visible for snapshot). @@ -1006,7 +1037,7 @@ export const ModelSelectorPrettyWithGateway: AppStory = { throw new Error("Gateway tooltip not visible"); } }, - { interval: 50 } + { interval: 50, timeout: 5000 } ); }, parameters: { @@ -1053,7 +1084,7 @@ export const ModelSelectorDropdownOpen: AppStory = { const canvas = within(canvasElement); // Wait for chat input to mount - await canvas.findAllByText("Exec", {}, { timeout: 10000 }); + await canvas.findAllByText("Exec", {}, { timeout: 15000 }); // Wait for model selector to be clickable (shows pretty name "GPT-4o") const modelSelector = await waitFor(() => { diff --git a/src/browser/stories/App.codeExecution.stories.tsx b/src/browser/stories/App.codeExecution.stories.tsx index 5fe9ff0cbd..3537051e3a 100644 --- a/src/browser/stories/App.codeExecution.stories.tsx +++ b/src/browser/stories/App.codeExecution.stories.tsx @@ -12,8 +12,6 @@ import { } from "./mockFactory"; import { setupSimpleChatStory } from "./storyHelpers"; -import { waitForChatMessagesLoaded } from "./storyPlayHelpers"; -import { userEvent, waitFor } from "@storybook/test"; export default { ...appMeta, @@ -444,7 +442,12 @@ export const Interrupted: AppStory = { ), }; -/** Code execution showing the code view (monospace font test) */ +/** + * Code execution showing the code view (monospace font test). + * + * No play step is needed here: when execution completes without nested tool calls, + * CodeExecutionToolCall auto-switches from "tools" to "code" view. + */ export const ShowCodeView: AppStory = { render: () => ( ), - play: async ({ canvasElement }: { canvasElement: HTMLElement }) => { - await waitForChatMessagesLoaded(canvasElement); - - // Find and click the "Show Code" button (CodeIcon) - await waitFor(() => { - const buttons = canvasElement.querySelectorAll('button[type="button"]'); - const showCodeBtn = Array.from(buttons).find((btn) => { - const svg = btn.querySelector("svg"); - return svg?.classList.contains("lucide-code"); - }); - if (!showCodeBtn) throw new Error("Show Code button not found"); - return showCodeBtn; - }); - - const buttons = canvasElement.querySelectorAll('button[type="button"]'); - const showCodeBtn = Array.from(buttons).find((btn) => { - const svg = btn.querySelector("svg"); - return svg?.classList.contains("lucide-code"); - }) as HTMLElement; - - await userEvent.click(showCodeBtn); - - // Wait for code view to be displayed (font-mono class should be present) - await waitFor(() => { - const codeContainer = canvasElement.querySelector(".font-mono"); - if (!codeContainer) throw new Error("Code view not displayed"); - }); - }, }; diff --git a/src/browser/stories/mockFactory.ts b/src/browser/stories/mockFactory.ts index c344e5e65c..0fc0c86e41 100644 --- a/src/browser/stories/mockFactory.ts +++ b/src/browser/stories/mockFactory.ts @@ -766,7 +766,7 @@ export function createStaticChatHandler(messages: ChatMuxMessage[]): ChatHandler for (const msg of messages) { callback(msg); } - callback({ type: "caught-up" }); + callback({ type: "caught-up", hasOlderHistory: false }); }, 50); // eslint-disable-next-line @typescript-eslint/no-empty-function return () => {}; @@ -788,7 +788,7 @@ export function createStreamingChatHandler(opts: { for (const msg of opts.messages) { callback(msg); } - callback({ type: "caught-up" }); + callback({ type: "caught-up", hasOlderHistory: false }); // Start streaming callback({ diff --git a/src/browser/stories/mocks/orpc.ts b/src/browser/stories/mocks/orpc.ts index fe4d02e118..bc751f26e7 100644 --- a/src/browser/stories/mocks/orpc.ts +++ b/src/browser/stories/mocks/orpc.ts @@ -1139,7 +1139,7 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): APICl if (!onChat) { // Default mock behavior: subscriptions should remain open. // If this ends, WorkspaceStore will retry and reset state, which flakes stories. - const caughtUp: WorkspaceChatMessage = { type: "caught-up" }; + const caughtUp: WorkspaceChatMessage = { type: "caught-up", hasOlderHistory: false }; yield caughtUp; await new Promise((resolve) => { diff --git a/src/browser/stories/storyHelpers.ts b/src/browser/stories/storyHelpers.ts index 32ac45d40f..986694e9d8 100644 --- a/src/browser/stories/storyHelpers.ts +++ b/src/browser/stories/storyHelpers.ts @@ -322,8 +322,9 @@ export function createOnChatAdapter(chatHandlers: Map) { if (handler) { return handler(emit); } - // Default: emit caught-up immediately - queueMicrotask(() => emit({ type: "caught-up" })); + // Default: emit caught-up immediately. Modern backends include hasOlderHistory + // on full replays; default to false in stories to avoid phantom pagination UI. + queueMicrotask(() => emit({ type: "caught-up", hasOlderHistory: false })); return undefined; }; } diff --git a/src/browser/stories/storyPlayHelpers.ts b/src/browser/stories/storyPlayHelpers.ts index 645090ce17..352520f4a9 100644 --- a/src/browser/stories/storyPlayHelpers.ts +++ b/src/browser/stories/storyPlayHelpers.ts @@ -7,8 +7,8 @@ import { waitFor } from "@storybook/test"; * to let any pending coalesced scroll from useAutoScroll complete. */ export async function waitForChatMessagesLoaded(canvasElement: HTMLElement): Promise { - // Use 15s timeout to handle CI cold-start scenarios where large dependencies - // (Shiki, Mermaid) are still being loaded/initialized + // Use 25s timeout to handle CI cold-start scenarios where large dependencies + // (Shiki, Mermaid) are still being loaded/initialized on busy runners await waitFor( () => { const messageWindow = canvasElement.querySelector('[data-testid="message-window"]'); @@ -16,7 +16,7 @@ export async function waitForChatMessagesLoaded(canvasElement: HTMLElement): Pro throw new Error("Messages not loaded yet"); } }, - { timeout: 15000 } + { timeout: 25000 } ); // One RAF to let any pending coalesced scroll complete diff --git a/src/browser/utils/messages/StreamingMessageAggregator.test.ts b/src/browser/utils/messages/StreamingMessageAggregator.test.ts index 35a40ffe5a..554443996e 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.test.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.test.ts @@ -860,7 +860,7 @@ describe("StreamingMessageAggregator", () => { type: "message" as const, }); - test("does not prune on first compaction (no penultimate boundary exists)", () => { + test("prunes older messages on first compaction and keeps the new boundary", () => { const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); // Simulate messages accumulated during a live session (no prior compaction) @@ -880,7 +880,6 @@ describe("StreamingMessageAggregator", () => { aggregator.handleMessage(msg1); aggregator.handleMessage(msg2); - // First compaction boundary arrives — no penultimate boundary to prune to const summary = asChatMessage( createMuxMessage("summary-1", "assistant", "Compacted summary", { historySequence: 2, @@ -892,12 +891,14 @@ describe("StreamingMessageAggregator", () => { ); aggregator.handleMessage(summary); - // All messages retained (first compaction, no penultimate boundary) + // Existing messages with sequence < incoming boundary (2) are pruned. + // The incoming boundary itself is appended after pruning and remains visible. const remaining = aggregator.getAllMessages(); - expect(remaining).toHaveLength(3); + expect(remaining).toHaveLength(1); + expect(remaining.map((m) => m.id)).toEqual(["summary-1"]); }); - test("prunes pre-penultimate messages on second compaction", () => { + test("keeps only the latest boundary epoch start on subsequent compactions", () => { const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); // Epoch 0 messages (before any compaction) @@ -930,10 +931,11 @@ describe("StreamingMessageAggregator", () => { ); aggregator.handleMessage(epoch1Msg); - // All 3 messages visible before second compaction - expect(aggregator.getAllMessages()).toHaveLength(3); + // First boundary already pruned epoch 0; boundary-1 + epoch1-user remain. + expect(aggregator.getAllMessages()).toHaveLength(2); - // Second compaction boundary (epoch 2) — should prune everything before boundary-1 + // Second compaction boundary (epoch 2): existing messages with sequence < 3 + // are pruned, then boundary-2 is appended. const boundary2 = asChatMessage( createMuxMessage("boundary-2", "assistant", "Summary epoch 2", { historySequence: 3, @@ -945,11 +947,46 @@ describe("StreamingMessageAggregator", () => { ); aggregator.handleMessage(boundary2); - // epoch0-user (seq 0) is before penultimate boundary (seq 1), so it's pruned. - // Remaining: boundary-1 (seq 1), epoch1-user (seq 2), boundary-2 (seq 3) const remaining = aggregator.getAllMessages(); - expect(remaining).toHaveLength(3); - expect(remaining.map((m) => m.id)).toEqual(["boundary-1", "epoch1-user", "boundary-2"]); + expect(remaining).toHaveLength(1); + expect(remaining.map((m) => m.id)).toEqual(["boundary-2"]); + }); + + test("updates reconnect cursor floor when a live compaction boundary arrives", () => { + const aggregator = new StreamingMessageAggregator(TEST_CREATED_AT); + + // Simulate initial replay window starting at historySequence 40. + aggregator.loadHistoricalMessages( + [ + createMuxMessage("history-40", "user", "Historical user", { + historySequence: 40, + timestamp: 40, + }), + createMuxMessage("history-41", "assistant", "Historical assistant", { + historySequence: 41, + timestamp: 41, + }), + ], + false, + { mode: "replace" } + ); + + const beforeCompactionCursor = aggregator.getOnChatCursor(); + expect(beforeCompactionCursor?.history?.oldestHistorySequence).toBe(40); + + const boundary = asChatMessage( + createMuxMessage("boundary-60", "assistant", "Summary epoch 60", { + historySequence: 60, + compacted: "user", + compactionBoundary: true, + compactionEpoch: 60, + muxMetadata: { type: "compaction-summary" }, + }) + ); + aggregator.handleMessage(boundary); + + const afterCompactionCursor = aggregator.getOnChatCursor(); + expect(afterCompactionCursor?.history?.oldestHistorySequence).toBe(60); }); test("does not prune messages when a non-boundary message arrives", () => { diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 4995c121d1..139dbedfad 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -267,6 +267,11 @@ export class StreamingMessageAggregator { private recencyTimestamp: number | null = null; private lastResponseCompletedAt: number | null = null; + /** Oldest historySequence from the server's last replay window. + * Used for reconnect cursors instead of the absolute minimum (which + * includes user-loaded older pages via loadOlderHistory). */ + private establishedOldestHistorySequence: number | null = null; + // Delta history for token counting and TPS calculation private deltaHistory = new Map(); @@ -840,11 +845,12 @@ export class StreamingMessageAggregator { * @param messages - Historical messages to load * @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario) * @param opts.mode - "replace" clears existing state first, "append" merges into existing state + * @param opts.skipDerivedState - Skip replaying messages into derived state when appending older history */ loadHistoricalMessages( messages: MuxMessage[], hasActiveStream = false, - opts?: { mode?: "replace" | "append" } + opts?: { mode?: "replace" | "append"; skipDerivedState?: boolean } ): void { const mode = opts?.mode ?? "replace"; @@ -860,6 +866,16 @@ export class StreamingMessageAggregator { this.skillLoadErrors.clear(); this.skillLoadErrorsCache = []; this.lastResponseCompletedAt = null; + + // Track the replay window's oldest sequence for reconnect cursors. + let minSeq: number | null = null; + for (const msg of messages) { + const seq = msg.metadata?.historySequence; + if (typeof seq === "number" && (minSeq === null || seq < minSeq)) { + minSeq = seq; + } + } + this.establishedOldestHistorySequence = minSeq; } const overwrittenMessageIds: string[] = []; @@ -904,22 +920,24 @@ export class StreamingMessageAggregator { (a, b) => (a.metadata?.historySequence ?? 0) - (b.metadata?.historySequence ?? 0) ); - // Replay historical messages in order to reconstruct derived state - for (const message of chronologicalMessages) { - this.maybeTrackLoadedSkillFromAgentSkillSnapshot(message.metadata?.agentSkillSnapshot); + if (!opts?.skipDerivedState) { + // Replay historical messages in order to reconstruct derived state + for (const message of chronologicalMessages) { + this.maybeTrackLoadedSkillFromAgentSkillSnapshot(message.metadata?.agentSkillSnapshot); - if (message.role === "user") { - // Mirror live behavior: clear stream-scoped state on new user turn - // but keep persisted status for fallback on reload. - this.currentTodos = []; - this.agentStatus = undefined; - continue; - } + if (message.role === "user") { + // Mirror live behavior: clear stream-scoped state on new user turn + // but keep persisted status for fallback on reload. + this.currentTodos = []; + this.agentStatus = undefined; + continue; + } - if (message.role === "assistant") { - for (const part of message.parts) { - if (isDynamicToolPart(part) && part.state === "output-available") { - this.processToolResult(part.toolName, part.input, part.output, context); + if (message.role === "assistant") { + for (const part of message.parts) { + if (isDynamicToolPart(part) && part.state === "output-available") { + this.processToolResult(part.toolName, part.input, part.output, context); + } } } } @@ -937,6 +955,10 @@ export class StreamingMessageAggregator { this.invalidateCache(); } + setEstablishedOldestHistorySequence(sequence: number | null): void { + this.establishedOldestHistorySequence = sequence; + } + getAllMessages(): MuxMessage[] { this.cache.allMessages ??= Array.from(this.messages.values()).sort( (a, b) => (a.metadata?.historySequence ?? 0) - (b.metadata?.historySequence ?? 0) @@ -979,16 +1001,32 @@ export class StreamingMessageAggregator { return undefined; } + const allMessages = this.getAllMessages(); + const establishedOldestHistorySequence = this.establishedOldestHistorySequence; + const fingerprintMessages = + establishedOldestHistorySequence != null + ? allMessages.filter( + (message) => + (message.metadata?.historySequence ?? Number.POSITIVE_INFINITY) >= + establishedOldestHistorySequence + ) + : allMessages; + + // Scope fingerprint input to the established replay window. The server computes + // priorHistoryFingerprint from getHistoryFromLatestBoundary(skip=0), so client- + // paginated rows from older compaction epochs must be excluded to avoid false + // mismatches that force unnecessary full replay on reconnect. const priorHistoryFingerprint = computePriorHistoryFingerprint( - this.getAllMessages(), + fingerprintMessages, maxHistorySequence ); + const oldestHistorySequence = establishedOldestHistorySequence ?? minHistorySequence; const cursor: OnChatCursor = { history: { messageId: maxHistoryMessageId, historySequence: maxHistorySequence, - oldestHistorySequence: minHistorySequence, + oldestHistorySequence, ...(priorHistoryFingerprint !== undefined ? { priorHistoryFingerprint } : {}), }, }; @@ -1366,6 +1404,7 @@ export class StreamingMessageAggregator { this.interruptingMessageId = null; this.lastAbortReason = null; this.lastResponseCompletedAt = null; + this.establishedOldestHistorySequence = null; this.invalidateCache(); } @@ -2145,13 +2184,12 @@ export class StreamingMessageAggregator { } // When a compaction boundary arrives during a live session, prune messages - // older than the penultimate boundary so the UI matches what a fresh load - // would show (emitHistoricalEvents reads from skip=1, the penultimate boundary). - // The user sees the previous epoch + current epoch; older epochs are pruned. - // Without this, all pre-boundary messages persist until the next page refresh. - // TODO: support paginated history loading so users can view older epochs on demand. + // older than the incoming boundary sequence so the UI matches a fresh load + // (emitHistoricalEvents now reads from skip=0, the latest boundary only). + // This keeps only the current epoch visible in-session; older epochs remain + // available via Load More history pagination. if (this.isCompactionBoundarySummaryMessage(incomingMessage)) { - this.pruneBeforePenultimateBoundary(incomingMessage); + this.pruneBeforeLatestBoundary(incomingMessage); } // Now add the new message @@ -2203,37 +2241,33 @@ export class StreamingMessageAggregator { } /** - * Keep the previous epoch visible: when the new (Nth) boundary arrives, - * find the penultimate (N-1) boundary among existing messages and prune - * everything before it. This matches the backend's getHistoryFromLatestBoundary - * which reads from the n-1 boundary. + * Keep only the latest epoch visible during a live session. * - * If only one boundary exists (the incoming one), nothing is pruned — the - * user sees their full first-epoch history. + * When a new boundary arrives, existing messages still represent older epochs. + * Prune every existing message with a lower sequence than the incoming boundary + * so once the incoming boundary is appended, the transcript matches fresh loads + * from getHistoryFromLatestBoundary(skip=0). Older epochs remain accessible via + * Load More. */ - private pruneBeforePenultimateBoundary(_incomingBoundary: MuxMessage): void { - // Find the penultimate boundary among the *existing* messages (before adding - // the incoming one). With the incoming boundary about to become the latest, - // the existing latest boundary becomes the penultimate one. - let penultimateBoundarySeq: number | undefined; - for (const [, msg] of this.messages.entries()) { - if (!this.isCompactionBoundarySummaryMessage(msg)) continue; - const seq = msg.metadata?.historySequence; - if (seq === undefined) continue; - // The highest-sequence boundary in existing messages is the one that - // will become the penultimate once the incoming boundary is added. - if (penultimateBoundarySeq === undefined || seq > penultimateBoundarySeq) { - penultimateBoundarySeq = seq; - } + private pruneBeforeLatestBoundary(incomingBoundary: MuxMessage): void { + const incomingBoundarySequence = incomingBoundary.metadata?.historySequence; + // Self-healing guard: malformed boundary metadata should not crash live sessions. + if (incomingBoundarySequence === undefined) return; + + // Live compaction advances the replay window floor to the incoming boundary. + // Keep reconnect cursors aligned with the server's latest-boundary replay window + // so incremental reconnects remain eligible after compaction. + if ( + this.establishedOldestHistorySequence === null || + incomingBoundarySequence > this.establishedOldestHistorySequence + ) { + this.establishedOldestHistorySequence = incomingBoundarySequence; } - // No existing boundary → this is the first compaction, nothing to prune - if (penultimateBoundarySeq === undefined) return; - const toRemove: string[] = []; for (const [id, msg] of this.messages.entries()) { const seq = msg.metadata?.historySequence; - if (seq !== undefined && seq < penultimateBoundarySeq) { + if (seq !== undefined && seq < incomingBoundarySequence) { toRemove.push(id); } } diff --git a/src/browser/utils/ui/keybinds.ts b/src/browser/utils/ui/keybinds.ts index 83c3bd94b1..4a73c7ce7f 100644 --- a/src/browser/utils/ui/keybinds.ts +++ b/src/browser/utils/ui/keybinds.ts @@ -299,6 +299,9 @@ export const KEYBINDS = { /** Jump to bottom of chat */ JUMP_TO_BOTTOM: { key: "G", shift: true }, + /** Load older transcript messages when pagination is available */ + LOAD_OLDER_MESSAGES: { key: "h", shift: true }, + /** Navigate to next workspace in current project */ NEXT_WORKSPACE: { key: "j", ctrl: true }, diff --git a/src/common/orpc/schemas/api.ts b/src/common/orpc/schemas/api.ts index af15df02c4..dfa1df0e25 100644 --- a/src/common/orpc/schemas/api.ts +++ b/src/common/orpc/schemas/api.ts @@ -977,6 +977,29 @@ export const workspace = { input: z.object({ workspaceId: z.string() }), output: z.array(WorkspaceChatMessageSchema), }, + history: { + loadMore: { + input: z.object({ + workspaceId: z.string(), + cursor: z + .object({ + beforeHistorySequence: z.number(), + beforeMessageId: z.string().nullish(), + }) + .nullish(), + }), + output: z.object({ + messages: z.array(WorkspaceChatMessageSchema), + nextCursor: z + .object({ + beforeHistorySequence: z.number(), + beforeMessageId: z.string().nullish(), + }) + .nullable(), + hasOlder: z.boolean(), + }), + }, + }, /** * Load an archived subagent transcript (chat.jsonl + optional partial.json) from this workspace's * session dir. diff --git a/src/common/orpc/schemas/stream.ts b/src/common/orpc/schemas/stream.ts index 0de2b2b9d4..51f498444f 100644 --- a/src/common/orpc/schemas/stream.ts +++ b/src/common/orpc/schemas/stream.ts @@ -65,6 +65,11 @@ export const CaughtUpMessageSchema = z.object({ type: z.literal("caught-up"), /** Which replay strategy the server actually used. */ replay: z.enum(["full", "since", "live"]).optional(), + /** + * Authoritative pagination signal for full replays. + * Omitted for since/live replays so the client can preserve existing pagination state. + */ + hasOlderHistory: z.boolean().optional(), /** Server's cursor at end of replay (client should use this for next reconnect). */ cursor: OnChatCursorSchema.optional(), }); diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index 7e08276c6e..a5db105b43 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -3070,6 +3070,14 @@ export const router = (authToken?: string) => { } }), }, + history: { + loadMore: t + .input(schemas.workspace.history.loadMore.input) + .output(schemas.workspace.history.loadMore.output) + .handler(async ({ context, input }) => { + return context.workspaceService.getHistoryLoadMore(input.workspaceId, input.cursor); + }), + }, getPlanContent: t .input(schemas.workspace.getPlanContent.input) .output(schemas.workspace.getPlanContent.output) diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index ec3f2d283f..abe93129c1 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -398,6 +398,7 @@ export class AgentSession { mode?: OnChatMode ): Promise { let replayMode: "full" | "since" | "live" = "full"; + let hasOlderHistory: boolean | undefined; let serverCursor: OnChatCursor | undefined; let emittedReplayMessages = false; @@ -467,13 +468,11 @@ export class AgentSession { const partial = await this.historyService.readPartial(this.workspaceId); const partialHistorySequence = partial?.metadata?.historySequence; - // Load chat history from the penultimate compaction boundary onward - // (skip=1) so the user sees the previous epoch plus the current epoch. - // This provides context for what was summarized in the latest compaction. - // TODO: support paginated history loading so users can view older epochs on demand. + // Load chat history from the latest compaction boundary onward (skip=0). + // Older compaction epochs are fetched on demand through workspace.history.loadMore. const historyResult = await this.historyService.getHistoryFromLatestBoundary( this.workspaceId, - 1 + 0 ); let sinceHistorySequence: number | undefined; @@ -558,6 +557,18 @@ export class AgentSession { afterTimestamp = undefined; } + if (replayMode === "full") { + if (oldestHistorySequence === undefined) { + // Empty full replay means there is no older page to request. + hasOlderHistory = false; + } else { + hasOlderHistory = await this.historyService.hasHistoryBeforeSequence( + this.workspaceId, + oldestHistorySequence + ); + } + } + for (const message of history) { // Skip the placeholder message if we have a partial with the same historySequence. // The placeholder has empty parts; the partial has the actual content. @@ -676,6 +687,7 @@ export class AgentSession { message: { type: "caught-up", replay: replayMode, + ...(hasOlderHistory !== undefined ? { hasOlderHistory } : {}), cursor: serverCursor, }, }); diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 9774bf43e4..2c4930e5c5 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -950,6 +950,77 @@ describe("HistoryService", () => { }); }); + describe("getHistoryBoundaryWindow", () => { + it("returns one older boundary window at a time and reports hasOlder", async () => { + const workspaceId = "ws-boundary-window"; + const workspaceDir = config.getSessionDir(workspaceId); + await fs.mkdir(workspaceDir, { recursive: true }); + + const lines: string[] = []; + let seq = 0; + + lines.push( + JSON.stringify({ + ...createMuxMessage("e1-user", "user", "epoch 1 user", { historySequence: seq++ }), + workspaceId, + }) + ); + lines.push( + JSON.stringify({ + ...createMuxMessage("e1-boundary", "assistant", "summary 1", { + historySequence: seq++, + compactionBoundary: true, + compacted: "user", + compactionEpoch: 1, + }), + workspaceId, + }) + ); + lines.push( + JSON.stringify({ + ...createMuxMessage("e2-user", "user", "epoch 2 user", { historySequence: seq++ }), + workspaceId, + }) + ); + lines.push( + JSON.stringify({ + ...createMuxMessage("e2-boundary", "assistant", "summary 2", { + historySequence: seq++, + compactionBoundary: true, + compacted: "idle", + compactionEpoch: 2, + }), + workspaceId, + }) + ); + lines.push( + JSON.stringify({ + ...createMuxMessage("post-e2", "user", "latest message", { historySequence: seq++ }), + workspaceId, + }) + ); + + await fs.writeFile(path.join(workspaceDir, "chat.jsonl"), lines.join("\n") + "\n"); + + const firstWindow = await service.getHistoryBoundaryWindow(workspaceId, 3); + expect(firstWindow.success).toBe(true); + if (firstWindow.success) { + expect(firstWindow.data.messages.map((message) => message.id)).toEqual([ + "e1-boundary", + "e2-user", + ]); + expect(firstWindow.data.hasOlder).toBe(true); + } + + const secondWindow = await service.getHistoryBoundaryWindow(workspaceId, 1); + expect(secondWindow.success).toBe(true); + if (secondWindow.success) { + expect(secondWindow.data.messages.map((message) => message.id)).toEqual(["e1-user"]); + expect(secondWindow.data.hasOlder).toBe(false); + } + }); + }); + describe("getLastMessages", () => { it("should return empty array when no history exists", async () => { const result = await service.getLastMessages("nonexistent", 5); diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index a10d13c637..4257029fbe 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -649,6 +649,126 @@ export class HistoryService { } } + private getOldestHistorySequence(messages: readonly MuxMessage[]): number | undefined { + let oldest: number | undefined; + + for (const message of messages) { + const sequence = message.metadata?.historySequence; + if (!isNonNegativeInteger(sequence)) { + continue; + } + + if (oldest === undefined || sequence < oldest) { + oldest = sequence; + } + } + + return oldest; + } + + async hasHistoryBeforeSequence( + workspaceId: string, + beforeHistorySequence: number + ): Promise { + assert( + typeof workspaceId === "string" && workspaceId.trim().length > 0, + "workspaceId is required" + ); + assert( + isNonNegativeInteger(beforeHistorySequence), + "hasHistoryBeforeSequence requires a non-negative integer" + ); + + let hasOlder = false; + await this.iterateBackward(workspaceId, (messages) => { + for (const message of messages) { + const sequence = message.metadata?.historySequence; + if (!isNonNegativeInteger(sequence)) { + continue; + } + + if (sequence < beforeHistorySequence) { + hasOlder = true; + return false; + } + } + }); + + return hasOlder; + } + + /** + * Read one compaction-epoch history window older than `beforeHistorySequence`. + * + * Returns messages whose historySequence is strictly less than `beforeHistorySequence` + * and belong to the nearest-older boundary window. + */ + async getHistoryBoundaryWindow( + workspaceId: string, + beforeHistorySequence: number + ): Promise> { + assert( + typeof workspaceId === "string" && workspaceId.trim().length > 0, + "workspaceId is required" + ); + assert( + isNonNegativeInteger(beforeHistorySequence), + "getHistoryBoundaryWindow requires beforeHistorySequence to be a non-negative integer" + ); + + try { + // Scan boundaries newest→oldest and pick the first window that has rows older than the cursor. + for (let skip = 0; ; skip++) { + const boundaryOffset = await this.findLastBoundaryByteOffset(workspaceId, skip); + if (boundaryOffset === null) { + break; + } + + const tailMessages = await this.readHistoryFromOffset(workspaceId, boundaryOffset); + const windowMessages = tailMessages.filter((message) => { + const sequence = message.metadata?.historySequence; + return isNonNegativeInteger(sequence) && sequence < beforeHistorySequence; + }); + + if (windowMessages.length === 0) { + continue; + } + + const oldestWindowSequence = this.getOldestHistorySequence(windowMessages); + assert( + oldestWindowSequence !== undefined, + "window messages filtered by historySequence must include a sequence" + ); + + const hasOlder = await this.hasHistoryBeforeSequence(workspaceId, oldestWindowSequence); + return Ok({ messages: windowMessages, hasOlder }); + } + + // No older boundary window found. Fall back to pre-boundary rows (or empty on uncompacted history). + const allMessages = await this.readChatHistory(workspaceId); + const preBoundaryMessages = allMessages.filter((message) => { + const sequence = message.metadata?.historySequence; + return isNonNegativeInteger(sequence) && sequence < beforeHistorySequence; + }); + + if (preBoundaryMessages.length === 0) { + return Ok({ messages: [], hasOlder: false }); + } + + const oldestWindowSequence = this.getOldestHistorySequence(preBoundaryMessages); + assert( + oldestWindowSequence !== undefined, + "pre-boundary messages filtered by historySequence must include a sequence" + ); + + const hasOlder = await this.hasHistoryBeforeSequence(workspaceId, oldestWindowSequence); + return Ok({ messages: preBoundaryMessages, hasOlder }); + } catch (error) { + const message = getErrorMessage(error); + return Err(`Failed to read history boundary window: ${message}`); + } + } + /** * Read messages from a compaction boundary onward. * Falls back to full history if no boundary exists (new/uncompacted workspace). diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 0050283cb9..3e6ef55fb9 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -333,6 +333,42 @@ function isPositiveInteger(value: unknown): value is number { ); } +function isNonNegativeInteger(value: unknown): value is number { + return ( + typeof value === "number" && Number.isFinite(value) && Number.isInteger(value) && value >= 0 + ); +} + +function getOldestSequencedMessage( + messages: readonly MuxMessage[] +): { message: MuxMessage; historySequence: number } | null { + let oldest: { message: MuxMessage; historySequence: number } | null = null; + + for (const message of messages) { + const historySequence = message.metadata?.historySequence; + if (!isNonNegativeInteger(historySequence)) { + continue; + } + + if (oldest === null || historySequence < oldest.historySequence) { + oldest = { message, historySequence }; + } + } + + return oldest; +} + +interface WorkspaceHistoryLoadMoreCursor { + beforeHistorySequence: number; + beforeMessageId?: string | null; +} + +interface WorkspaceHistoryLoadMoreResult { + messages: WorkspaceChatMessage[]; + nextCursor: WorkspaceHistoryLoadMoreCursor | null; + hasOlder: boolean; +} + function hasDurableCompactedMarker(value: unknown): value is true | "user" | "idle" { return value === true || value === "user" || value === "idle"; } @@ -3807,6 +3843,127 @@ export class WorkspaceService extends EventEmitter { } } + async getHistoryLoadMore( + workspaceId: string, + cursor: WorkspaceHistoryLoadMoreCursor | null | undefined + ): Promise { + assert( + typeof workspaceId === "string" && workspaceId.trim().length > 0, + "workspaceId is required" + ); + + if (cursor !== null && cursor !== undefined) { + assert( + isNonNegativeInteger(cursor.beforeHistorySequence), + "cursor.beforeHistorySequence must be a non-negative integer" + ); + assert( + cursor.beforeMessageId === null || + cursor.beforeMessageId === undefined || + typeof cursor.beforeMessageId === "string", + "cursor.beforeMessageId must be a string, null, or undefined" + ); + if (typeof cursor.beforeMessageId === "string") { + assert( + cursor.beforeMessageId.trim().length > 0, + "cursor.beforeMessageId must be non-empty when provided" + ); + } + } + + const emptyResult: WorkspaceHistoryLoadMoreResult = { + messages: [], + nextCursor: null, + hasOlder: false, + }; + + try { + let beforeHistorySequence: number | undefined = cursor?.beforeHistorySequence; + + if (beforeHistorySequence === undefined) { + // Initial load-more request (no cursor) should page one epoch older than startup replay. + const latestBoundaryResult = await this.historyService.getHistoryFromLatestBoundary( + workspaceId, + 0 + ); + if (!latestBoundaryResult.success) { + log.warn("workspace.history.loadMore: failed to read latest boundary", { + workspaceId, + error: latestBoundaryResult.error, + }); + return emptyResult; + } + + const oldestFromLatestBoundary = getOldestSequencedMessage(latestBoundaryResult.data); + if (!oldestFromLatestBoundary) { + return emptyResult; + } + + beforeHistorySequence = oldestFromLatestBoundary.historySequence; + } + + assert( + isNonNegativeInteger(beforeHistorySequence), + "resolved beforeHistorySequence must be a non-negative integer" + ); + + const historyWindowResult = await this.historyService.getHistoryBoundaryWindow( + workspaceId, + beforeHistorySequence + ); + if (!historyWindowResult.success) { + log.warn("workspace.history.loadMore: failed to read boundary window", { + workspaceId, + beforeHistorySequence, + error: historyWindowResult.error, + }); + return emptyResult; + } + + const messages: WorkspaceChatMessage[] = historyWindowResult.data.messages.map((message) => ({ + ...message, + type: "message", + })); + + if (!historyWindowResult.data.hasOlder) { + return { + messages, + nextCursor: null, + hasOlder: false, + }; + } + + const oldestInWindow = getOldestSequencedMessage(historyWindowResult.data.messages); + if (!oldestInWindow) { + // Defensive fallback: if we cannot build a stable cursor, stop paging instead of looping. + log.warn("workspace.history.loadMore: cannot compute next cursor despite hasOlder=true", { + workspaceId, + beforeHistorySequence, + }); + return { + messages, + nextCursor: null, + hasOlder: false, + }; + } + + return { + messages, + nextCursor: { + beforeHistorySequence: oldestInWindow.historySequence, + beforeMessageId: oldestInWindow.message.id, + }, + hasOlder: true, + }; + } catch (error) { + log.error("Failed to load more workspace history:", { + workspaceId, + error: getErrorMessage(error), + }); + return emptyResult; + } + } + async getFileCompletions( workspaceId: string, query: string, diff --git a/tests/e2e/scenarios/slashCommands.spec.ts b/tests/e2e/scenarios/slashCommands.spec.ts index 1eadd5c8fd..b19a30483d 100644 --- a/tests/e2e/scenarios/slashCommands.spec.ts +++ b/tests/e2e/scenarios/slashCommands.spec.ts @@ -96,10 +96,11 @@ test.describe("slash command flows", () => { await ui.chat.expectTranscriptContains(MOCK_COMPACTION_SUMMARY_PREFIX); await expect(transcript).toContainText(MOCK_COMPACTION_SUMMARY_PREFIX); await expect(transcript).toContainText("Compaction boundary"); - // Regression check: transcript now keeps only the top compaction boundary row. + // With skip=0 (latest boundary only) replay, compaction prunes pre-boundary + // messages from the live view. They are accessible via "Load older messages". await expect(transcript).not.toContainText("Resume after compaction"); - await expect(transcript).toContainText("Mock README content"); - await expect(transcript).toContainText("Directory listing:"); + await expect(transcript).not.toContainText("Mock README content"); + await expect(transcript).not.toContainText("Directory listing:"); }); test("slash command /model sonnet switches models for subsequent turns", async ({ ui, page }) => { diff --git a/tests/ui/compaction/compaction.test.ts b/tests/ui/compaction/compaction.test.ts index 80df88bc9e..4cf8da3d16 100644 --- a/tests/ui/compaction/compaction.test.ts +++ b/tests/ui/compaction/compaction.test.ts @@ -76,8 +76,9 @@ describe("Compaction UI (mock AI router)", () => { // Compaction transcript now renders a single top boundary row. await app.chat.expectTranscriptContains("Compaction boundary"); - // Compaction is append-only: pre-compaction transcript remains visible. - await app.chat.expectTranscriptContains(seedMessage); + // Live compaction now prunes to the latest boundary, so pre-compaction + // transcript is no longer visible in the current view. + await app.chat.expectTranscriptNotContains(seedMessage); } finally { await app.dispose(); } @@ -113,22 +114,40 @@ describe("Compaction UI (mock AI router)", () => { const seedMessage = "Seed conversation for compaction"; const triggerMessage = "[force] Trigger force compaction"; - await app.chat.send(seedMessage); + const seedResult = await app.env.orpc.workspace.sendMessage({ + workspaceId: app.workspaceId, + message: seedMessage, + options: { model: WORKSPACE_DEFAULTS.model, agentId: WORKSPACE_DEFAULTS.agentId }, + }); + expect(seedResult.success).toBe(true); await app.chat.expectTranscriptContains(`Mock response: ${seedMessage}`); - await app.chat.send(triggerMessage); + const triggerResult = await app.env.orpc.workspace.sendMessage({ + workspaceId: app.workspaceId, + message: triggerMessage, + options: { model: WORKSPACE_DEFAULTS.model, agentId: WORKSPACE_DEFAULTS.agentId }, + }); + expect(triggerResult.success).toBe(true); - await app.chat.expectTranscriptContains("Mock compaction summary:", 60_000); - await app.chat.expectTranscriptContains("Mock response: Continue", 60_000); + const compactionAssertionTimeoutMs = 120_000; + await app.chat.expectTranscriptContains( + "Mock compaction summary:", + compactionAssertionTimeoutMs + ); + await app.chat.expectTranscriptContains( + "Mock response: Continue", + compactionAssertionTimeoutMs + ); // Compaction transcript now renders a single top boundary row. - await app.chat.expectTranscriptContains("Compaction boundary", 60_000); + await app.chat.expectTranscriptContains("Compaction boundary", compactionAssertionTimeoutMs); - // Force compaction is append-only: keep the triggering history around the boundary rows. - await app.chat.expectTranscriptContains(triggerMessage, 60_000); + // Force compaction now prunes to the latest boundary window, so the + // pre-compaction triggering turn is no longer shown. + await app.chat.expectTranscriptNotContains(triggerMessage, compactionAssertionTimeoutMs); } finally { await app.dispose(); } - }, 60_000); + }, 120_000); test("/compact command sends any foreground bash to background", async () => { const app = await createAppHarness({ branchPrefix: "compaction-ui" }); @@ -241,7 +260,7 @@ describe("Compaction UI (mock AI router)", () => { unregister?.(); await app.dispose(); } - }, 60_000); + }, 120_000); }); describe("Compaction notification behavior (mock AI router)", () => { diff --git a/tests/ui/helpers.ts b/tests/ui/helpers.ts index 8b9009cddc..aede99b3f8 100644 --- a/tests/ui/helpers.ts +++ b/tests/ui/helpers.ts @@ -154,6 +154,14 @@ export async function setupWorkspaceView( { timeout: 10_000 } ); fireEvent.click(workspaceElement); + + // Ensure the workspace is registered and activated in the store so that + // runOnChatSubscription starts. In the real app, WorkspaceContext handles + // registration via syncWorkspaces and activation via useLayoutEffect, but + // in happy-dom tests these may not have completed by the time the test + // asserts on transcript content. Both calls are idempotent. + workspaceStore.addWorkspace(metadata); + workspaceStore.setActiveWorkspaceId(workspaceId); } /**