diff --git a/apps/webapp/test/replay-after-crash.test.ts b/apps/webapp/test/replay-after-crash.test.ts index 576ced2ab2..747a27eac9 100644 --- a/apps/webapp/test/replay-after-crash.test.ts +++ b/apps/webapp/test/replay-after-crash.test.ts @@ -55,8 +55,11 @@ function textTurn(id: string, text: string): UIMessageChunk[] { * via the webapp's real `generatePresignedUrl` (so snapshot reads * hit a real S3-compatible backend). * - `readSessionStreamRecords` returns the canonical - * `{ records: [{ data, id, seqNum }] }` shape — `data` is the - * JSON-encoded chunk body, mirroring the webapp's S2 record shape. + * `{ records: [{ data, id, seqNum }] }` shape. `data` is the parsed + * chunk OBJECT — the SDK writer puts the chunk object directly into + * the record envelope and the webapp route forwards it as-is, so + * the schema now declares `data: z.unknown()` and consumers use it + * without an extra `JSON.parse` step. */ function stubApiClient(opts: { projectRef: string; @@ -64,7 +67,7 @@ function stubApiClient(opts: { sessionOutChunks: unknown[]; }) { const records = opts.sessionOutChunks.map((chunk, i) => ({ - data: typeof chunk === "string" ? chunk : JSON.stringify(chunk), + data: chunk, id: `evt-${i + 1}`, seqNum: i + 1, })); diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 6cb746762c..e86e503de4 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1995,14 +1995,21 @@ export type SendInputStreamResponseBody = z.infer( * part of the public API. * @internal */ +type ReplaySessionOutTailResult = { + /** Messages whose `finish` chunk landed before the run died. Safe to seed the chain. */ + settled: TUIMessage[]; + /** + * The trailing assistant message whose `finish` chunk never arrived — + * an orphan from a cancel / crash / OOM. `cleanupAbortedParts` has + * already stripped streaming-in-progress fragments. `undefined` if + * the tail ended cleanly (every segment closed). + */ + partial: TUIMessage | undefined; + /** + * The trailing assistant message BEFORE `cleanupAbortedParts` ran. Same + * `undefined` semantics as `partial`. Use this when you need to inspect + * tool parts the cleanup would strip (e.g. `input-available` / + * `input-streaming` orphans surfaced via `pendingToolCalls`). + */ + partialRaw: TUIMessage | undefined; +}; + type ReplaySessionOutTailImpl = ( sessionId: string, options?: { lastEventId?: string } -) => Promise; +) => Promise>; let replaySessionOutTailImpl: ReplaySessionOutTailImpl | undefined; export function __setReplaySessionOutTailImplForTests( @@ -544,7 +563,7 @@ export function __setReplaySessionOutTailImplForTests( async function replaySessionOutTail( sessionId: string, options?: { lastEventId?: string } -): Promise { +): Promise> { if (replaySessionOutTailImpl) { return await replaySessionOutTailImpl(sessionId, options); } @@ -554,16 +573,12 @@ async function replaySessionOutTail( }); const collected: UIMessageChunk[] = []; for (const record of response.records) { - // Each record's `data` is the JSON-encoded chunk body the agent - // wrote at append time. The records endpoint returns it as an - // opaque string so the parsing cost is paid here, not on the - // server's hot path. - let chunk: unknown; - try { - chunk = JSON.parse(record.data); - } catch { - continue; - } + // `data` is the chunk object as written by the SDK's session-out + // writer (an AI SDK `UIMessageChunk` or a Trigger control object). + // The route forwards it as-is — no JSON envelope to unwrap here. + // Defensive shape checks below tolerate malformed records by + // skipping them instead of throwing. + const chunk: unknown = record.data; if (!chunk || typeof chunk !== "object") continue; const type = (chunk as { type?: unknown }).type; if (typeof type !== "string") continue; @@ -573,7 +588,7 @@ async function replaySessionOutTail( if (type.startsWith("trigger:")) continue; collected.push(chunk as UIMessageChunk); } - if (collected.length === 0) return []; + if (collected.length === 0) return { settled: [], partial: undefined, partialRaw: undefined }; // Split chunks into per-message segments. A `start` chunk demarcates the // beginning of an assistant message; chunks before any `start` (rare — @@ -602,7 +617,9 @@ async function replaySessionOutTail( } } - const messages: TUIMessage[] = []; + const settled: TUIMessage[] = []; + let partial: TUIMessage | undefined; + let partialRaw: TUIMessage | undefined; for (let i = 0; i < segments.length; i++) { const seg = segments[i]!; const isTrailing = i === segments.length - 1 && !seg.closed; @@ -631,12 +648,17 @@ async function replaySessionOutTail( if (isTrailing) { const cleaned = cleanupAbortedParts(last as TUIMessage); if (cleaned.parts.length === 0) continue; - messages.push(cleaned); + partial = cleaned; + // Keep the raw pre-cleanup message too — recovery boot extracts + // `pendingToolCalls` from it, since `cleanupAbortedParts` strips + // exactly the input-streaming / input-available tool parts that + // we want to surface. + partialRaw = last as TUIMessage; } else { - messages.push(last as TUIMessage); + settled.push(last as TUIMessage); } } - return messages; + return { settled, partial, partialRaw }; } /** @@ -663,12 +685,125 @@ export async function __replaySessionOutTailProductionPathForTests< const saved = replaySessionOutTailImpl; replaySessionOutTailImpl = undefined; try { - return await replaySessionOutTail(sessionId, options); + const { settled, partial } = await replaySessionOutTail(sessionId, options); + return partial !== undefined ? [...settled, partial] : settled; } finally { replaySessionOutTailImpl = saved; } } +/** + * Test-only override hook for `replaySessionInTail`. Mirrors + * `__setReplaySessionOutTailImplForTests` so unit tests can drive the boot + * loop's chain-reconstruction logic without an HTTP round-trip. + * @internal + */ +type ReplaySessionInTailImpl = ( + sessionId: string, + options?: { lastEventId?: string } +) => Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]>; +let replaySessionInTailImpl: ReplaySessionInTailImpl | undefined; + +export function __setReplaySessionInTailImplForTests( + impl: ReplaySessionInTailImpl | undefined +): void { + replaySessionInTailImpl = impl; +} + +/** + * Drain `session.in` from `lastEventId` (or the start) and surface the user + * messages that landed past the cursor. Mirror of `replaySessionOutTail` — + * both reads run at continuation boot so the SDK can reconstruct + * conversational order across a dead run that never wrote `onTurnComplete`. + * + * `session.in` carries the {@link ChatInputChunk} tagged union: + * - `kind: "message"` — a `ChatTaskWirePayload` envelope for a new user + * message (`trigger: "submit-message"`) or a regeneration. Only the + * submit-message records carry a `payload.message`; regenerations, + * preload / close / action / handover-prepare have no message. + * - `kind: "stop"` — mid-turn cancellation signal. Not a message. + * - `kind: "handover"` / `kind: "handover-skip"` — head-start signals. + * Not user messages. + * + * This function filters to the first variant and returns the embedded + * `UIMessage`s in seq_num order, paired with their seq_num so the caller + * can advance the session.in cursor past them. + * + * Errors are propagated to the caller (the boot loop wraps in try/catch + * and `logger.warn`s); we don't swallow here so test code can observe + * failures directly. + * @internal + */ +async function replaySessionInTail( + sessionId: string, + options?: { lastEventId?: string } +): Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]> { + if (replaySessionInTailImpl) { + return await replaySessionInTailImpl(sessionId, options); + } + const apiClient = apiClientManager.clientOrThrow(); + const response = await apiClient.readSessionStreamRecords(sessionId, "in", { + afterEventId: options?.lastEventId, + }); + const out: { message: TUIMessage; metadata: unknown; seqNum: number }[] = []; + for (const record of response.records) { + // session.in writers POST `JSON.stringify(chunk)` directly; the + // webapp wraps that in `{ data: , id }` and stores it on + // S2. The records endpoint hands `data` back as the original + // string — unlike session.out (where the writer puts a chunk + // OBJECT into the envelope and the route forwards it as an + // object). Defensive: handle both shapes so future writer changes + // on either side don't silently lose records. + let chunk: unknown = record.data; + if (typeof chunk === "string") { + try { + chunk = JSON.parse(chunk); + } catch { + continue; + } + } + if (!chunk || typeof chunk !== "object") continue; + const kind = (chunk as { kind?: unknown }).kind; + if (kind !== "message") continue; + const payload = ( + chunk as { + payload?: { trigger?: unknown; message?: unknown; metadata?: unknown }; + } + ).payload; + if (!payload || payload.trigger !== "submit-message") continue; + const message = payload.message; + if (!message || typeof message !== "object") continue; + out.push({ + message: message as TUIMessage, + metadata: payload.metadata, + seqNum: record.seqNum, + }); + } + return out; +} + +/** + * Test-only entry point that bypasses + * `__setReplaySessionInTailImplForTests` and reaches the real + * `apiClient.readSessionStreamRecords` + filter pipeline. Mirrors + * `__replaySessionOutTailProductionPathForTests`. + * @internal + */ +export async function __replaySessionInTailProductionPathForTests< + TUIMessage extends UIMessage, +>( + sessionId: string, + options?: { lastEventId?: string } +): Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]> { + const saved = replaySessionInTailImpl; + replaySessionInTailImpl = undefined; + try { + return await replaySessionInTail(sessionId, options); + } finally { + replaySessionInTailImpl = saved; + } +} + /** * Resolve the Session handle for the current chat.agent run. * @@ -1900,6 +2035,40 @@ function* iterateToolParts( } } +/** + * Walk a partial assistant message and surface the tool calls the model + * had started but never received a result for. Used at recovery boot to + * populate `RecoveryBootEvent.pendingToolCalls`. + * + * The partial assistant in question is the orphan from a dead run — its + * `turn-complete` never fired, so any `input-available` tool part is + * truly orphan (NOT a stable HITL pause; HITL parts live on settled + * messages, not on the partial). + * + * @internal + */ +function extractPendingToolCallsFromPartial( + partial: UIMessage | undefined +): RecoveryPendingToolCall[] { + if (!partial) return []; + const out: RecoveryPendingToolCall[] = []; + const parts = (partial.parts ?? []) as any[]; + for (let i = 0; i < parts.length; i++) { + const part = parts[i]; + if (!isToolUIPart(part)) continue; + if (!isPendingToolState(part.state)) continue; + const toolCallId = part.toolCallId; + if (typeof toolCallId !== "string" || toolCallId.length === 0) continue; + out.push({ + toolCallId, + toolName: getToolName(part), + input: part.input, + partIndex: i, + }); + } + return out; +} + /** * Tool parts on the *leaf* assistant message that are still waiting on * an answer (`input-available` state). Used to gate fresh user turns @@ -3531,6 +3700,126 @@ export type BootEvent = { preloaded: boolean; }; +/** + * A tool call extracted from the partial assistant message of a dead run. + * Surfaced on `RecoveryBootEvent.pendingToolCalls` so the customer can + * decide how to repair the chain (synthesize a result, drop the partial, + * etc.). + */ +export type RecoveryPendingToolCall = { + /** The AI SDK tool call id. */ + toolCallId: string; + /** The tool name (the `tool-${name}` suffix). */ + toolName: string; + /** The input the model produced for the tool call. */ + input: unknown; + /** The part index inside `partialAssistant.parts` for in-place edits. */ + partIndex: number; +}; + +/** + * Event passed to the `onRecoveryBoot` callback. + * + * Fires once at boot when a continuation run inherits in-flight state from + * a dead predecessor (cancel / crash / OOM / deploy eviction / graceful + * `chat.requestUpgrade`). The runtime reads both `session.in` and + * `session.out` past the last `turn-complete` cursor and surfaces the + * recovered pieces here so the customer can shape the conversational + * chain before the first turn fires. + * + * Does NOT fire when there's nothing to recover (clean continuation after + * `chat.endRun()` with no buffered user messages, fresh chat, OOM retry + * after a successful turn-complete with no in-flight tail). + * + * Does NOT fire when `hydrateMessages` is registered (the customer owns + * persistence; recovery decisions live in their own DB query). + */ +export type RecoveryBootEvent = { + /** Task run context — same as `task({ run })` second-argument `ctx`. */ + ctx: TaskRunContext; + /** The unique identifier for the chat session. */ + chatId: string; + /** The Trigger.dev run ID for this run boot. */ + runId: string; + /** Public id of the prior run that died. */ + previousRunId: string; + /** + * Best-effort cause of the predecessor's death. Currently always + * `"unknown"` — the run engine doesn't yet plumb the real reason + * into the continuation payload. Future SDK versions will narrow + * this. Don't branch behavior on it yet. + */ + cause: "cancelled" | "crashed" | "unknown"; + /** + * The conversation chain that was successfully persisted by the + * predecessor's last `onTurnComplete`. Empty if the predecessor died + * before turn 1 ever completed. + */ + settledMessages: TUIM[]; + /** + * User messages that arrived on `session.in` past the cursor — i.e. + * the message(s) the predecessor was processing or had queued when + * it died. The runtime's default is to re-dispatch each as a fresh + * turn after the chain is restored. Return a different list via + * `recoveredTurns` to skip / reorder / collapse them. + */ + inFlightUsers: TUIM[]; + /** + * The trailing assistant message the predecessor was streaming when + * it died — the orphan whose `turn-complete` never fired. Undefined + * if the predecessor died before any assistant output reached + * `session.out` (cancel-before-first-token, snapshot-only path). + */ + partialAssistant: TUIM | undefined; + /** + * Tool calls extracted from `partialAssistant.parts` that the model + * had started but the tool runtime never resolved. Empty when + * `partialAssistant` is undefined or carries no `input-available` + * tool parts. + */ + pendingToolCalls: RecoveryPendingToolCall[]; + /** + * Lazy session.out writer — identical to the `writer` passed to + * `onTurnStart` / `onTurnComplete` / `onChatStart`. Use this to emit + * a recovery signal (e.g. a `data-chat-recovery` UIMessage chunk) + * BEFORE the first recovered turn fires so the bridge can render a + * "recovering..." banner. Lazy: no overhead if unused. + */ + writer: ChatWriter; +}; + +/** + * Return shape for the `onRecoveryBoot` callback. Every field is optional — + * omit one to accept the default. + */ +export type RecoveryBootResult = { + /** + * The chain the new run boots with. Replaces the default + * (`settledMessages`). Use this to keep the partial assistant in + * context, mutate its tool parts to inject synthesized results, + * collapse history, etc. + * + * Ignored when `hydrateMessages` is registered (the hydrate hook + * runs per-turn and overwrites the chain). + */ + chain?: TUIM[]; + /** + * The user messages to re-dispatch as fresh turns after the chain is + * restored. Default: `inFlightUsers` (re-process every in-flight + * user). Return `[]` to suppress all of them; return a filtered / + * reordered subset to skip specific ones. + */ + recoveredTurns?: TUIM[]; + /** + * Awaitable run AFTER the writer flushes and BEFORE the first + * recovered turn fires. Use for blocking persistence (e.g. write the + * partial assistant to your DB so a follow-up turn can reference + * it). Errors bubble — wrap your own try/catch if you want to soft- + * fail. + */ + beforeBoot?: () => Promise; +}; + /** * Event passed to the `onChatStart` callback. * @@ -4016,6 +4305,43 @@ export type ChatAgentOptions< */ onBoot?: (event: BootEvent>) => Promise | void; + /** + * Recovery boot hook — fires once on a continuation run that inherited + * in-flight state from a dead predecessor (cancel / crash / OOM / + * deploy eviction / `chat.requestUpgrade()`). The runtime reads both + * stream tails past the last `turn-complete` cursor and hands the + * customer the recovered pieces (settled chain, in-flight users, + * partial assistant, pending tool calls) so the chain can be shaped + * before the first recovered turn fires. + * + * Does NOT fire when there's nothing to recover — e.g. a clean + * continuation after `chat.endRun()` with no buffered user, a fresh + * chat, or an OOM retry on top of a complete snapshot. + * + * Does NOT fire when `hydrateMessages` is registered — that hook owns + * the per-turn chain and overlapping recovery decisions belong in the + * customer's DB. + * + * Defaults (returned when the hook is omitted or returns no field): + * - `chain` = `settledMessages` (drop the orphan partial) + * - `recoveredTurns` = `inFlightUsers` (re-dispatch every user) + * + * @example + * ```ts + * onRecoveryBoot: async ({ partialAssistant, inFlightUsers, writer, cause }) => { + * writer.write({ + * type: "data-chat-recovery", + * id: generateId(), + * data: { cause, partial: partialAssistant?.id }, + * }); + * return {}; // accept defaults: drop partial, re-dispatch users + * } + * ``` + */ + onRecoveryBoot?: ( + event: RecoveryBootEvent + ) => Promise | void> | RecoveryBootResult | void; + /** * Called when a preloaded run starts, before the first message arrives. * @@ -4526,6 +4852,7 @@ function chatAgent< run: userRun, clientDataSchema, onBoot, + onRecoveryBoot, onPreload, onChatStart, onValidateMessages, @@ -4656,51 +4983,136 @@ function chatAgent< // swallow errors internally; the agent stays available either way. const sessionIdForSnapshot = payload.sessionId ?? payload.chatId; let bootSnapshot: ChatSnapshotV1 | undefined; - let replayed: TUIMessage[] = []; + let replayedSettled: TUIMessage[] = []; + let replayedPartial: TUIMessage | undefined; + let replayedPartialRaw: TUIMessage | undefined; + let replayedInTail: { message: TUIMessage; metadata: unknown; seqNum: number }[] = []; + // Wire payloads to dispatch as turns before the regular session.in + // pump kicks in. Populated by `onRecoveryBoot.recoveredTurns` (or its + // default, `inFlightUsers`). The turn-loop checks this queue ahead of + // `messagesInput.waitWithIdleTimeout` so recovered turns fire first. + const bootInjectedQueue: ChatTaskWirePayload< + TUIMessage, + inferSchemaIn + >[] = []; const couldHavePriorState = payload.continuation === true || ctx.attempt.number > 1; if (!hydrateMessages && couldHavePriorState) { - try { - bootSnapshot = await tracer.startActiveSpan( - "chat.boot.snapshot.read", - async () => readChatSnapshot(sessionIdForSnapshot) - ); - } catch (error) { - // `readChatSnapshot` already swallows + warns internally; this catch - // is just belt-and-suspenders against tracer/span errors. - logger.warn("chat.agent: snapshot read failed; continuing without snapshot", { - error: error instanceof Error ? error.message : String(error), - sessionId: sessionIdForSnapshot, - }); - } + // Single parent span for the whole boot read phase — snapshot + // read, session.out replay, session.in replay. Per-phase timing + // + result counts are attributes on the span. + await tracer.startActiveSpan( + "chat.boot", + async (bootSpan) => { + // snapshot read + const snapStart = Date.now(); + try { + bootSnapshot = await readChatSnapshot(sessionIdForSnapshot); + } catch (error) { + // `readChatSnapshot` already swallows + warns internally; this catch + // is just belt-and-suspenders against tracer/span errors. + logger.warn( + "chat.agent: snapshot read failed; continuing without snapshot", + { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + } + ); + } + bootSpan.setAttribute("chat.boot.snapshot.durationMs", Date.now() - snapStart); + bootSpan.setAttribute("chat.boot.snapshot.present", !!bootSnapshot); + bootSpan.setAttribute( + "chat.boot.snapshot.messageCount", + bootSnapshot?.messages?.length ?? 0 + ); - // Seed the trim chain from the snapshot's `lastOutEventId` (the SSE - // id of the previous turn's `turn-complete` control record). The - // first turn-complete this worker writes will then trim back to it. - // Without seeding, the new worker would emit no trim on its first - // turn (chain self-bootstraps from turn 2), so this is purely an - // optimization to keep continuation runs bounded from the first turn. - if (bootSnapshot?.lastOutEventId !== undefined) { - const seeded = Number.parseInt(bootSnapshot.lastOutEventId, 10); - if (Number.isFinite(seeded)) { - const slot = locals.get(lastTurnCompleteSeqNumKey); - if (slot) slot.value = seeded; - } - } + // Seed the trim chain from the snapshot's `lastOutEventId` (the SSE + // id of the previous turn's `turn-complete` control record). The + // first turn-complete this worker writes will then trim back to it. + // Without seeding, the new worker would emit no trim on its first + // turn (chain self-bootstraps from turn 2), so this is purely an + // optimization to keep continuation runs bounded from the first turn. + if (bootSnapshot?.lastOutEventId !== undefined) { + const seeded = Number.parseInt(bootSnapshot.lastOutEventId, 10); + if (Number.isFinite(seeded)) { + const slot = locals.get(lastTurnCompleteSeqNumKey); + if (slot) slot.value = seeded; + } + } - try { - replayed = await tracer.startActiveSpan("chat.boot.replay", async () => - replaySessionOutTail(sessionIdForSnapshot, { - lastEventId: bootSnapshot?.lastOutEventId, - }) - ); - } catch (error) { - logger.warn("chat.agent: session.out replay failed; using snapshot only", { - error: error instanceof Error ? error.message : String(error), - sessionId: sessionIdForSnapshot, - }); - } + // session.out replay + const replayOutStart = Date.now(); + try { + const replayResult = await replaySessionOutTail( + sessionIdForSnapshot, + { lastEventId: bootSnapshot?.lastOutEventId } + ); + replayedSettled = replayResult.settled; + replayedPartial = replayResult.partial; + replayedPartialRaw = replayResult.partialRaw; + } catch (error) { + logger.warn( + "chat.agent: session.out replay failed; using snapshot only", + { + error: error instanceof Error ? error.message : String(error), + sessionId: sessionIdForSnapshot, + } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.out.durationMs", + Date.now() - replayOutStart + ); + bootSpan.setAttribute("chat.boot.replay.out.settledCount", replayedSettled.length); + bootSpan.setAttribute( + "chat.boot.replay.out.partialPresent", + replayedPartial !== undefined + ); + + // session.in tail read + // + // session.in carries the user-side of the conversation + // (ChatInputChunk records). On a continuation boot we read past + // the last turn-complete's `session-in-event-id` header so any + // user message the dead predecessor hadn't acknowledged surfaces + // here. Without this read, in-flight user messages would only be + // visible via the live SSE subscription — by which point they + // would arrive AFTER the partial-assistant orphan and look like + // brand-new turns to the model, producing inverted chains. + const replayInStart = Date.now(); + const lastInEventId = await findLatestSessionInCursor(payload.chatId) + .then((cursor) => (cursor !== undefined ? String(cursor) : undefined)) + .catch(() => undefined); + try { + replayedInTail = await replaySessionInTail(payload.chatId, { + lastEventId: lastInEventId, + }); + } catch (error) { + logger.warn( + "chat.agent: session.in replay failed; in-flight users may not be recovered", + { error: error instanceof Error ? error.message : String(error) } + ); + } + bootSpan.setAttribute( + "chat.boot.replay.in.durationMs", + Date.now() - replayInStart + ); + bootSpan.setAttribute( + "chat.boot.replay.in.userCount", + replayedInTail.length + ); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "tabler-rotate-clockwise", + [SemanticInternalAttributes.COLLAPSED]: true, + "chat.id": payload.chatId, + "chat.continuation": payload.continuation ?? false, + "chat.attempt": ctx.attempt.number, + }, + } + ); } // ── session.in resume cursor ─────────────────────────────────── @@ -4748,12 +5160,158 @@ function chatAgent< } } - // ── Merge + head-start bootstrap ──────────────────────────────── + // ── Recovery boot + chain reconstruction ──────────────────────── if (!hydrateMessages) { - accumulatedUIMessages = mergeByIdReplaceWins( + const settledMessages = mergeByIdReplaceWins( (bootSnapshot?.messages as TUIMessage[]) ?? [], - replayed + replayedSettled ); + const inFlightUsers = replayedInTail.map((r) => r.message); + const partialAssistant = replayedPartial; + // Fire the hook only when there's a partial assistant — the + // mid-stream-died signal. In-flight users alone (no partial) + // cover graceful exits like `chat.requestUpgrade()` where the + // predecessor chose to end before processing the message; + // those route through the normal continuation-wait path. + const hasRecoveredState = partialAssistant !== undefined; + + let hookChain: TUIMessage[] | undefined; + let hookRecoveredTurns: TUIMessage[] | undefined; + let hookBeforeBoot: (() => Promise) | undefined; + if (couldHavePriorState && hasRecoveredState && onRecoveryBoot) { + // Extract from the RAW partial (pre-cleanup). `cleanupAbortedParts` + // strips exactly the input-streaming / input-available tool parts + // we want to surface here, so the cleaned `partialAssistant` would + // always report zero pending tool calls. + const pendingToolCalls = extractPendingToolCallsFromPartial(replayedPartialRaw); + const previousRunIdForHook = previousRunId ?? ""; + let hookResult: RecoveryBootResult | void = undefined; + const { writer: hookWriter, flush: hookFlush } = createLazyChatWriter(); + try { + hookResult = await tracer.startActiveSpan( + "onRecoveryBoot()", + async () => + onRecoveryBoot({ + ctx, + chatId: payload.chatId, + runId: ctx.run.id, + previousRunId: previousRunIdForHook, + cause: "unknown", + settledMessages, + inFlightUsers, + partialAssistant, + pendingToolCalls, + writer: hookWriter, + }), + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "task-hook-onStart", + [SemanticInternalAttributes.COLLAPSED]: true, + "chat.id": payload.chatId, + }, + } + ); + } catch (error) { + logger.warn("chat.agent: onRecoveryBoot threw; using defaults", { + error: error instanceof Error ? error.message : String(error), + chatId: payload.chatId, + }); + } + // Flush any chunks the hook wrote so they land on session.out + // BEFORE the first recovered turn fires. + try { + await hookFlush(); + } catch (error) { + logger.warn("chat.agent: onRecoveryBoot writer flush failed", { + error: error instanceof Error ? error.message : String(error), + }); + } + if (hookResult && typeof hookResult === "object") { + if (Array.isArray(hookResult.chain)) hookChain = hookResult.chain; + if (Array.isArray(hookResult.recoveredTurns)) + hookRecoveredTurns = hookResult.recoveredTurns; + if (typeof hookResult.beforeBoot === "function") + hookBeforeBoot = hookResult.beforeBoot; + } + } + + // Default: splice partial + the user it was answering into + // the chain so follow-ups like "keep going" still have context. + let seedChain: TUIMessage[]; + let recoveredTurns: TUIMessage[]; + if (hookChain !== undefined) { + seedChain = hookChain; + } else if (partialAssistant !== undefined && inFlightUsers.length > 0) { + seedChain = [...settledMessages, inFlightUsers[0]!, partialAssistant]; + } else { + seedChain = settledMessages; + } + if (hookRecoveredTurns !== undefined) { + recoveredTurns = hookRecoveredTurns; + } else if (partialAssistant !== undefined && inFlightUsers.length > 0) { + recoveredTurns = inFlightUsers.slice(1); + } else { + recoveredTurns = inFlightUsers; + } + // `beforeBoot` errors bubble — the customer opted into blocking + // persistence and a failure there should fail the run rather than + // dispatch recovered turns against half-persisted state. + if (hookBeforeBoot) { + await hookBeforeBoot(); + } + + // Advance the session.in cursor past every recovered user so + // the live subscription doesn't re-deliver them. + if (replayedInTail.length > 0) { + const lastRecoveredSeq = replayedInTail[replayedInTail.length - 1]!.seqNum; + const currentCursor = sessionStreams.lastSeqNum(payload.chatId, "in"); + if (currentCursor === undefined || lastRecoveredSeq > currentCursor) { + sessionStreams.setLastSeqNum(payload.chatId, "in", lastRecoveredSeq); + sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", lastRecoveredSeq); + } + } + + // Synthesize wire payloads for each recoveredTurn. The turn-loop + // pops these ahead of `messagesInput.waitWithIdleTimeout` so they + // dispatch as normal turns with the existing hook stack. + // + // Per-record metadata preservation: each session.in record + // carries its own `payload.metadata` (the transport sets it at + // send time). Look up the original by message id so a recovered + // turn dispatches with the metadata its writer actually sent. + // Fall back to the boot payload's metadata for hook-synthesized + // messages (customer returned a recoveredTurn with no matching + // session.in record). + // + // OOM-retry dedup: if `payload.message` is the same user message + // the queue is about to redispatch (the wire payload survives + // across attempts, but session.in records it once), the wire + // payload already runs turn 0 — drop the duplicate from the queue + // so we don't fire the same turn twice. + const wireMessageId = + (payload.message as { id?: string } | undefined)?.id; + const metadataById = new Map(); + for (const entry of replayedInTail) { + metadataById.set(entry.message.id, entry.metadata); + } + for (const msg of recoveredTurns) { + if (wireMessageId && msg.id === wireMessageId) continue; + const recoveredMetadata = metadataById.has(msg.id) + ? metadataById.get(msg.id) + : payload.metadata; + bootInjectedQueue.push({ + chatId: payload.chatId, + sessionId: payload.sessionId, + metadata: recoveredMetadata, + trigger: "submit-message", + message: msg, + messageId: msg.id, + continuation: payload.continuation, + previousRunId: payload.previousRunId, + } as ChatTaskWirePayload>); + } + + accumulatedUIMessages = seedChain; // ── Head-start bootstrap ───────────────────────────────────── // @@ -5125,6 +5683,15 @@ function chatAgent< parseClientData ? await parseClientData(payload.metadata) : payload.metadata ) as inferSchemaOut; + // Recovery-boot injection: if `onRecoveryBoot` (or its default + // `inFlightUsers`) populated `bootInjectedQueue`, dispatch the + // first synthesized payload as the very first turn instead of + // waiting on the live session.in. Subsequent recovered turns + // get drained by the end-of-turn picker below. + if (bootInjectedQueue.length > 0) { + currentWirePayload = bootInjectedQueue.shift()!; + } else { + const effectiveIdleTimeout = idleTimeoutInSeconds ?? payload.idleTimeoutInSeconds; const effectiveTurnTimeout = @@ -5197,6 +5764,7 @@ function chatAgent< if (currentWirePayload.trigger === "close") { return; } + } // end else (no boot-injected first turn) } for (let turn = 0; turn < maxTurns; turn++) { @@ -6478,6 +7046,15 @@ function chatAgent< // before the next message, their injected context is picked up in prepareStep. // The pre-onBeforeTurnComplete drain handles promises from onTurnStart/run(). + // Recovery-boot injection: drain remaining recovered turns + // before any other source. `onRecoveryBoot` (or its default) + // produced these from in-flight user messages on session.in + // that the dead predecessor never acknowledged. + if (bootInjectedQueue.length > 0) { + currentWirePayload = bootInjectedQueue.shift()!; + return "continue"; + } + // If messages arrived during streaming (without pendingMessages config), // use the first one immediately as the next turn. if (pendingMessages.length > 0) { @@ -6627,6 +7204,14 @@ function chatAgent< return; } + // Drain remaining recovered turns before idling — a thrown + // recovered turn shouldn't strand the rest of the boot queue + // until an unrelated live message arrives. + if (bootInjectedQueue.length > 0) { + currentWirePayload = bootInjectedQueue.shift()!; + continue; + } + // Wait for the next message — same as after a successful turn const effectiveIdleTimeout = (metadata.get(IDLE_TIMEOUT_METADATA_KEY) as number | undefined) ?? @@ -6946,6 +7531,7 @@ function createChatBuilder< ...(config.clientDataSchema ? { clientDataSchema: config.clientDataSchema } : {}), uiMessageStreamOptions: mergedUiStream, onBoot: composeHooks(config.hooks.onBoot, options.onBoot), + onRecoveryBoot: options.onRecoveryBoot, onPreload: composeHooks(config.hooks.onPreload, options.onPreload), onChatStart: composeHooks(config.hooks.onChatStart, options.onChatStart), onTurnStart: composeHooks(config.hooks.onTurnStart, options.onTurnStart), diff --git a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts index 32fdba57cd..fbcc166d14 100644 --- a/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts +++ b/packages/trigger-sdk/src/v3/test/mock-chat-agent.ts @@ -11,6 +11,7 @@ import { } from "../sessions.js"; import { __setReadChatSnapshotImplForTests, + __setReplaySessionInTailImplForTests, __setReplaySessionOutTailImplForTests, __setWriteChatSnapshotImplForTests, type ChatSnapshotV1, @@ -238,6 +239,27 @@ export type MockChatAgentHarness = { */ seedSessionOutTail(chunks?: UIMessageChunk[]): void; + /** + * Pre-seed a trailing partial assistant message for the next boot's + * replay. The runtime's `replaySessionOutTail` returns this as the + * `partial` field (alongside whatever `seedSessionOutTail` reduces + * to). Use to simulate cancel-mid-stream: an assistant message whose + * `finish` chunk never arrived. Pass `undefined` to clear. + * + * Effective on the next run boot only. + */ + seedSessionOutPartial(partial: UIMessage | undefined): void; + + /** + * Pre-seed user messages on the `session.in` tail for the next boot's + * replay. Each message is paired with a synthetic seq_num (`i + 1`). + * Used to simulate in-flight users the dead predecessor was supposed + * to process. Pass `[]` to clear. + * + * Effective on the next run boot only. + */ + seedSessionInTail(messages: UIMessage[]): void; + /** * The most recently written snapshot, or `undefined` if no snapshot * has been written yet. Updated each time `writeChatSnapshot` is @@ -373,6 +395,8 @@ export function mockChatAgent( let seededSnapshot: ChatSnapshotV1 | undefined = options.snapshot; let lastWrittenSnapshot: ChatSnapshotV1 | undefined; let seededReplayChunks: UIMessageChunk[] = []; + let seededReplayPartial: UIMessage | undefined; + let seededSessionInMessages: UIMessage[] = []; __setReadChatSnapshotImplForTests((_id: string) => { return seededSnapshot as ChatSnapshotV1 | undefined; @@ -382,11 +406,37 @@ export function mockChatAgent( }); // Replay override: install a default that returns whatever - // `seededReplayChunks` reduces to. Cleared in the same `finally` block - // as the other test overrides. + // `seededReplayChunks` reduces to. `mockChatAgent` doesn't model the + // settled-vs-partial split — seeded chunks always reduce to the + // `settled` array with `partial: undefined`. Recovery-specific + // tests can install their own override to seed a partial. + // Cleared in the same `finally` block as the other test overrides. __setReplaySessionOutTailImplForTests(async () => { - if (seededReplayChunks.length === 0) return []; - return (await reduceChunksToMessages(seededReplayChunks)) as never; + const settled = + seededReplayChunks.length === 0 + ? [] + : ((await reduceChunksToMessages(seededReplayChunks)) as unknown[]); + // For the mock harness, `partialRaw` is the same as `partial` — we + // don't model cleanupAbortedParts separately. Recovery tests that + // need a partialRaw distinct from partial install their own stub. + return { + settled, + partial: seededReplayPartial, + partialRaw: seededReplayPartial, + } as never; + }); + + // session.in tail override: each seeded UIMessage becomes a + // { message, metadata: undefined, seqNum: i+1 } entry. Mirrors the + // seq-num pattern from the out-tail stub so cursor-advance logic is + // exercised correctly. `metadata` is `undefined` for seeded users — + // the boot path falls back to `payload.metadata` for those. + __setReplaySessionInTailImplForTests(async () => { + return seededSessionInMessages.map((message, i) => ({ + message, + metadata: undefined, + seqNum: i + 1, + })) as never; }); // Install the session open override so `sessions.open(id)` returns a @@ -521,6 +571,7 @@ export function mockChatAgent( __setReadChatSnapshotImplForTests(undefined); __setWriteChatSnapshotImplForTests(undefined); __setReplaySessionOutTailImplForTests(undefined); + __setReplaySessionInTailImplForTests(undefined); }); const sendPayloadAndWait = async ( @@ -616,6 +667,14 @@ export function mockChatAgent( seededReplayChunks = chunks ?? []; }, + seedSessionOutPartial(partial) { + seededReplayPartial = partial; + }, + + seedSessionInTail(messages) { + seededSessionInMessages = messages; + }, + getSnapshot() { return lastWrittenSnapshot; }, diff --git a/packages/trigger-sdk/test/recovery-boot.test.ts b/packages/trigger-sdk/test/recovery-boot.test.ts new file mode 100644 index 0000000000..5d7b4cd221 --- /dev/null +++ b/packages/trigger-sdk/test/recovery-boot.test.ts @@ -0,0 +1,483 @@ +// Import the test harness FIRST — installs the resource catalog so +// `chat.agent()` calls register their task functions correctly. +import { mockChatAgent } from "../src/v3/test/index.js"; + +import { describe, expect, it, vi } from "vitest"; +import { chat } from "../src/v3/ai.js"; +import type { RecoveryBootEvent, RecoveryBootResult } from "../src/v3/ai.js"; +import { __setReplaySessionOutTailImplForTests } from "../src/v3/ai.js"; +import { simulateReadableStream, streamText } from "ai"; +import { MockLanguageModelV3 } from "ai/test"; +import type { LanguageModelV3StreamPart } from "@ai-sdk/provider"; + +// ── Helpers ──────────────────────────────────────────────────────────── + +function userMessage(text: string, id = "u-" + Math.random().toString(36).slice(2)) { + return { + id, + role: "user" as const, + parts: [{ type: "text" as const, text }], + }; +} + +function assistantMessage(text: string, id = "a-" + Math.random().toString(36).slice(2)) { + return { + id, + role: "assistant" as const, + parts: [{ type: "text" as const, text }], + }; +} + +function partialAssistantWithToolCall(id: string, toolCallId: string, toolName: string) { + return { + id, + role: "assistant" as const, + parts: [ + { + type: `tool-${toolName}` as const, + toolCallId, + state: "input-available" as const, + input: { q: "search" }, + }, + ], + } as unknown as ReturnType; +} + +function textStream(text: string) { + const chunks: LanguageModelV3StreamPart[] = [ + { type: "text-start", id: "t1" }, + { type: "text-delta", id: "t1", delta: text }, + { type: "text-end", id: "t1" }, + { + type: "finish", + finishReason: { unified: "stop", raw: "stop" }, + usage: { + inputTokens: { total: 10, noCache: 10, cacheRead: undefined, cacheWrite: undefined }, + outputTokens: { total: 10, text: 10, reasoning: undefined }, + }, + }, + ]; + return simulateReadableStream({ chunks }); +} + +// ── Tests ────────────────────────────────────────────────────────────── + +describe("onRecoveryBoot — chat.agent recovery hook", () => { + it("does NOT fire on a clean continuation with no recovered state", async () => { + const onRecoveryBoot = vi.fn(); + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + const agent = chat.agent({ + id: "recovery-boot.no-state", + onRecoveryBoot, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "no-state", + continuation: true, + previousRunId: "run_prior", + }); + try { + // Snapshot is empty, no in-flight users, no partial — guard + // (partialAssistant !== undefined || inFlightUsers.length > 0) is false. + await harness.sendMessage(userMessage("fresh message")); + await new Promise((r) => setTimeout(r, 20)); + expect(onRecoveryBoot).not.toHaveBeenCalled(); + } finally { + await harness.close(); + } + }); + + it("fires when there's a partial assistant and surfaces it on the ctx", async () => { + const captured: { event?: RecoveryBootEvent> } = {}; + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("recovered") }), + }); + const partial = partialAssistantWithToolCall("a-orphan", "tc-1", "search"); + const agent = chat.agent({ + id: "recovery-boot.partial-fires-hook", + onRecoveryBoot: async (event) => { + captured.event = event as never; + return {}; + }, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "partial-fires-hook", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + try { + await harness.sendMessage(userMessage("next user message")); + await new Promise((r) => setTimeout(r, 20)); + expect(captured.event).toBeDefined(); + expect(captured.event!.partialAssistant?.id).toBe("a-orphan"); + expect(captured.event!.pendingToolCalls).toHaveLength(1); + expect(captured.event!.pendingToolCalls[0]!.toolCallId).toBe("tc-1"); + expect(captured.event!.pendingToolCalls[0]!.toolName).toBe("search"); + expect(captured.event!.previousRunId).toBe("run_prior"); + expect(captured.event!.cause).toBe("unknown"); + } finally { + await harness.close(); + } + }); + + it("pendingToolCalls is extracted from the RAW partial (pre-cleanupAbortedParts)", async () => { + // Real-world scenario: cancel-mid-tool-call. Session.out has tool-call + // chunks but the tool never returned. cleanupAbortedParts strips the + // input-available tool part from the partial used for the chain (you + // don't want orphan tool calls poisoning the model context), but + // `pendingToolCalls` should still surface what was happening. + const cleanedPartial = { + id: "a-orphan", + role: "assistant" as const, + parts: [{ type: "text" as const, text: "Let me look that up" }], + }; + const rawPartial = { + id: "a-orphan", + role: "assistant" as const, + parts: [ + { type: "text" as const, text: "Let me look that up" }, + { + type: "tool-search" as const, + toolCallId: "tc-pending", + state: "input-available" as const, + input: { q: "vietnamese pho" }, + }, + ], + } as unknown as typeof cleanedPartial; + + const captured: { event?: RecoveryBootEvent } = {}; + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + const u1 = userMessage("buffered", "u-1"); + const agent = chat.agent({ + id: "recovery-boot.pending-tool-from-raw", + onRecoveryBoot: async (event) => { + captured.event = event; + return {}; + }, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "pending-tool-from-raw", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionInTail([u1 as never]); + // Install AFTER mockChatAgent — its constructor sets its own default + // override that we want to replace for this test. + __setReplaySessionOutTailImplForTests(async () => + ({ + settled: [], + partial: cleanedPartial, + partialRaw: rawPartial, + }) as never + ); + try { + await new Promise((r) => setTimeout(r, 50)); + expect(captured.event).toBeDefined(); + // Cleaned partial → chain (no input-available tool part) + expect(captured.event!.partialAssistant?.parts).toHaveLength(1); + // pendingToolCalls → from raw (input-available tool part visible) + expect(captured.event!.pendingToolCalls).toHaveLength(1); + expect(captured.event!.pendingToolCalls[0]!.toolCallId).toBe("tc-pending"); + expect(captured.event!.pendingToolCalls[0]!.toolName).toBe("search"); + } finally { + await harness.close(); + } + }); + + it("does NOT fire when there are in-flight users but no partial (graceful exit path)", async () => { + // chat.requestUpgrade(), chat.endRun() before processing, and similar + // graceful exits leave an unacknowledged user on session.in but no + // partial assistant on session.out. That's not recovery — the next + // run just dispatches the message normally. + const onRecoveryBoot = vi.fn(); + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + const u1 = userMessage("buffered while dead", "u-buffered"); + const agent = chat.agent({ + id: "recovery-boot.inflight-users-no-partial", + onRecoveryBoot, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "inflight-users-no-partial", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionInTail([u1 as never]); + try { + await new Promise((r) => setTimeout(r, 50)); + expect(onRecoveryBoot).not.toHaveBeenCalled(); + } finally { + await harness.close(); + } + }); + + it("default behavior re-dispatches each in-flight user as a turn", async () => { + let turnCount = 0; + const model = new MockLanguageModelV3({ + doStream: async () => { + turnCount++; + return { stream: textStream(`reply ${turnCount}`) }; + }, + }); + const u1 = userMessage("first buffered", "u-1"); + const u2 = userMessage("second buffered", "u-2"); + const agent = chat.agent({ + id: "recovery-boot.default-dispatch", + // NO onRecoveryBoot — exercise the default path + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "default-dispatch", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionInTail([u1 as never, u2 as never]); + try { + await new Promise((r) => setTimeout(r, 100)); + expect(turnCount).toBe(2); + } finally { + await harness.close(); + } + }); + + it("smart default: partial + first user spliced into chain, rest dispatched", async () => { + let observedChain: Array<{ role: string; idHead: string }> = []; + let turnCount = 0; + const model = new MockLanguageModelV3({ + doStream: async () => { + turnCount++; + return { stream: textStream("ok") }; + }, + }); + const partial = assistantMessage("partial answer in progress", "a-partial"); + const u1 = userMessage("original question", "u-1"); + const u2 = userMessage("follow-up", "u-2"); + const agent = chat.agent({ + id: "recovery-boot.smart-default", + // NO onRecoveryBoot — exercise the smart default + onTurnStart: async ({ uiMessages }) => { + if (turnCount === 0) { + observedChain = uiMessages.map((m) => ({ + role: m.role, + idHead: m.id.slice(0, 10), + })); + } + }, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "smart-default", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + harness.seedSessionInTail([u1 as never, u2 as never]); + try { + await new Promise((r) => setTimeout(r, 100)); + // Turn 1 fires with the follow-up user (u2). Its chain should + // include [u1 (original), a-partial, u2 (follow-up)]. + expect(turnCount).toBe(1); + expect(observedChain.map((m) => m.role)).toEqual([ + "user", + "assistant", + "user", + ]); + expect(observedChain[0]!.idHead).toBe("u-1"); + expect(observedChain[1]!.idHead).toBe("a-partial"); + expect(observedChain[2]!.idHead).toBe("u-2"); + } finally { + await harness.close(); + } + }); + + it("hook's recoveredTurns: [] suppresses re-dispatch of in-flight users", async () => { + let turnCount = 0; + const model = new MockLanguageModelV3({ + doStream: async () => { + turnCount++; + return { stream: textStream(`reply ${turnCount}`) }; + }, + }); + const partial = assistantMessage("partial answer", "a-partial"); + const u1 = userMessage("buffered", "u-1"); + const agent = chat.agent({ + id: "recovery-boot.suppress-dispatch", + onRecoveryBoot: async (): Promise => ({ recoveredTurns: [] }), + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "suppress-dispatch", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + harness.seedSessionInTail([u1 as never]); + try { + // No turn should fire from the boot-injected queue. + // Send a fresh user message to confirm the agent is alive. + await harness.sendMessage(userMessage("real next message")); + await new Promise((r) => setTimeout(r, 20)); + expect(turnCount).toBe(1); // only the explicit sendMessage turn + } finally { + await harness.close(); + } + }); + + it("hook's chain override seeds the accumulator", async () => { + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("acked") }), + }); + const custom = assistantMessage("custom-recovered-history", "a-custom"); + const partial = assistantMessage("partial", "a-partial"); + const u1 = userMessage("buffered", "u-1"); + let observedMessageCount = 0; + const agent = chat.agent({ + id: "recovery-boot.chain-override", + onRecoveryBoot: async (): Promise => ({ + chain: [custom as never], + recoveredTurns: [u1 as never], + }), + onTurnStart: async ({ uiMessages }) => { + observedMessageCount = uiMessages.length; + }, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "chain-override", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + harness.seedSessionInTail([u1 as never]); + try { + await new Promise((r) => setTimeout(r, 50)); + // Chain seeded with [custom] before the recovered user message + // arrives — onTurnStart sees [custom, u1] when the first + // recovered turn fires. + expect(observedMessageCount).toBe(2); + } finally { + await harness.close(); + } + }); + + it("does NOT fire when hydrateMessages is registered", async () => { + const onRecoveryBoot = vi.fn(); + const model = new MockLanguageModelV3({ + doStream: async () => ({ stream: textStream("ok") }), + }); + const u1 = userMessage("buffered", "u-1"); + const agent = chat.agent({ + id: "recovery-boot.hydrate-skips", + hydrateMessages: async ({ incomingMessages }) => incomingMessages, + onRecoveryBoot, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "hydrate-skips", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionInTail([u1 as never]); + try { + await harness.sendMessage(userMessage("fresh")); + await new Promise((r) => setTimeout(r, 20)); + expect(onRecoveryBoot).not.toHaveBeenCalled(); + } finally { + await harness.close(); + } + }); + + it("beforeBoot runs before the first recovered turn fires", async () => { + const order: string[] = []; + const model = new MockLanguageModelV3({ + doStream: async () => { + order.push("turn"); + return { stream: textStream("ok") }; + }, + }); + const partial = assistantMessage("partial", "a-partial"); + const u1 = userMessage("buffered original", "u-1"); + const u2 = userMessage("followup", "u-2"); + const agent = chat.agent({ + id: "recovery-boot.before-boot", + onRecoveryBoot: async (): Promise => ({ + beforeBoot: async () => { + order.push("beforeBoot"); + }, + }), + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "before-boot", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + // Two users — smart default consumes u1 into the chain, leaves u2 for dispatch + harness.seedSessionInTail([u1 as never, u2 as never]); + try { + await new Promise((r) => setTimeout(r, 50)); + expect(order).toEqual(["beforeBoot", "turn"]); + } finally { + await harness.close(); + } + }); + + it("hook throwing falls back to defaults without sinking the run", async () => { + let turnCount = 0; + const model = new MockLanguageModelV3({ + doStream: async () => { + turnCount++; + return { stream: textStream("ok") }; + }, + }); + const partial = assistantMessage("partial", "a-partial"); + const u1 = userMessage("buffered original", "u-1"); + const u2 = userMessage("followup", "u-2"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const agent = chat.agent({ + id: "recovery-boot.hook-throws", + onRecoveryBoot: async () => { + throw new Error("kaboom"); + }, + run: async ({ messages, signal }) => + streamText({ model, messages, abortSignal: signal }), + }); + const harness = mockChatAgent(agent, { + chatId: "hook-throws", + continuation: true, + previousRunId: "run_prior", + }); + harness.seedSessionOutPartial(partial as never); + // Two users so smart default leaves u2 to dispatch (u1 spliced into chain) + harness.seedSessionInTail([u1 as never, u2 as never]); + try { + await new Promise((r) => setTimeout(r, 100)); + // Default behavior: the in-flight user is re-dispatched as a turn + // even though the hook threw. + expect(turnCount).toBe(1); + } finally { + await harness.close(); + warnSpy.mockRestore(); + } + }); +}); diff --git a/packages/trigger-sdk/test/replay-session-in.test.ts b/packages/trigger-sdk/test/replay-session-in.test.ts new file mode 100644 index 0000000000..92a1cb6f97 --- /dev/null +++ b/packages/trigger-sdk/test/replay-session-in.test.ts @@ -0,0 +1,137 @@ +// Import the test entry point first so the resource catalog is installed. +import "../src/v3/test/index.js"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { apiClientManager } from "@trigger.dev/core/v3"; +import { __replaySessionInTailProductionPathForTests as replaySessionInTail } from "../src/v3/ai.js"; + +// ── Helpers ──────────────────────────────────────────────────────────── + +function userMessage(id: string, text: string) { + return { + id, + role: "user" as const, + parts: [{ type: "text" as const, text }], + }; +} + +function stubReadRecords(chunks: unknown[]) { + const records = chunks.map((chunk, i) => ({ + data: chunk, + id: `evt-${i + 1}`, + seqNum: i + 1, + })); + const spy = vi.fn(async () => ({ records })); + vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({ + readSessionStreamRecords: spy, + } as never); + return spy; +} + +beforeEach(() => { + vi.restoreAllMocks(); +}); + +afterEach(() => { + vi.restoreAllMocks(); +}); + +// ── Tests ────────────────────────────────────────────────────────────── + +describe("replaySessionInTail", () => { + it("extracts user messages from kind: 'message' records with submit-message trigger", async () => { + const u1 = userMessage("u-1", "hello"); + const u2 = userMessage("u-2", "again"); + stubReadRecords([ + { + kind: "message", + payload: { chatId: "c1", trigger: "submit-message", message: u1, metadata: { userId: "a" } }, + }, + { + kind: "message", + payload: { chatId: "c1", trigger: "submit-message", message: u2, metadata: { userId: "b" } }, + }, + ]); + + const result = await replaySessionInTail("sess"); + expect(result).toHaveLength(2); + expect(result[0]!.message.id).toBe("u-1"); + expect(result[0]!.seqNum).toBe(1); + expect(result[0]!.metadata).toEqual({ userId: "a" }); + expect(result[1]!.message.id).toBe("u-2"); + expect(result[1]!.seqNum).toBe(2); + expect(result[1]!.metadata).toEqual({ userId: "b" }); + }); + + it("ignores non-message variants (stop, handover, handover-skip)", async () => { + const u1 = userMessage("u-1", "real user"); + stubReadRecords([ + { kind: "stop", message: "user stopped" }, + { kind: "handover-skip" }, + { kind: "handover", partialAssistantMessage: [], isFinal: false }, + { kind: "message", payload: { chatId: "c1", trigger: "submit-message", message: u1 } }, + ]); + + const result = await replaySessionInTail("sess"); + expect(result).toHaveLength(1); + expect(result[0]!.message.id).toBe("u-1"); + }); + + it("ignores message records that aren't submit-message", async () => { + // regenerate-message / preload / close / action / handover-prepare don't + // carry a user message — the chain reconstruction must skip them. + stubReadRecords([ + { kind: "message", payload: { chatId: "c1", trigger: "regenerate-message" } }, + { kind: "message", payload: { chatId: "c1", trigger: "preload" } }, + { kind: "message", payload: { chatId: "c1", trigger: "close" } }, + { kind: "message", payload: { chatId: "c1", trigger: "action", action: { foo: 1 } } }, + ]); + + const result = await replaySessionInTail("sess"); + expect(result).toHaveLength(0); + }); + + it("ignores records whose payload is missing or empty", async () => { + stubReadRecords([ + { kind: "message" }, // no payload + { kind: "message", payload: { chatId: "c1", trigger: "submit-message" } }, // no message + { kind: "message", payload: { chatId: "c1", trigger: "submit-message", message: null } }, + { + kind: "message", + payload: { chatId: "c1", trigger: "submit-message", message: "not-an-object" }, + }, + ]); + + const result = await replaySessionInTail("sess"); + expect(result).toHaveLength(0); + }); + + it("skips non-object record data defensively", async () => { + const u1 = userMessage("u-1", "valid"); + stubReadRecords([ + 42, + null, + "string-data", + { kind: "message", payload: { chatId: "c1", trigger: "submit-message", message: u1 } }, + ]); + + const result = await replaySessionInTail("sess"); + expect(result).toHaveLength(1); + expect(result[0]!.message.id).toBe("u-1"); + }); + + it("passes the afterEventId cursor through to readSessionStreamRecords", async () => { + const spy = stubReadRecords([]); + + await replaySessionInTail("sess", { lastEventId: "evt-42" }); + + expect(spy).toHaveBeenCalledWith("sess", "in", { afterEventId: "evt-42" }); + }); + + it("returns an empty list when the records endpoint returns no records", async () => { + stubReadRecords([]); + + const result = await replaySessionInTail("sess"); + expect(result).toEqual([]); + }); +}); diff --git a/packages/trigger-sdk/test/replay-session-out.test.ts b/packages/trigger-sdk/test/replay-session-out.test.ts index 802f4ff0c4..ed78ecec14 100644 --- a/packages/trigger-sdk/test/replay-session-out.test.ts +++ b/packages/trigger-sdk/test/replay-session-out.test.ts @@ -40,18 +40,19 @@ function partialTurn(id: string, text: string): UIMessageChunk[] { /** * Stub `apiClientManager.clientOrThrow().readSessionStreamRecords` so the * helper sees a `{ records: StreamRecord[] }` response. Each StreamRecord - * is `{ data: string, id, seqNum }` — `data` is the JSON-encoded chunk - * body the runtime then `JSON.parse`s. + * is `{ data, id, seqNum }` — `data` is the parsed chunk OBJECT (the wire + * writer puts chunks directly into the record envelope; the route + * forwards them as-is; the schema declares `data: z.unknown()`). * - * Pass either a `UIMessageChunk` (will be JSON.stringify'd) or a raw - * string (used as `data` directly — for tests that need pre-stringified - * or deliberately-malformed bodies). + * Pass either a chunk OBJECT (used as `data` directly) or a string + * (used as `data` directly — for tests that need deliberately-malformed + * bodies; the consumer filters non-objects out). * * Captures the `afterEventId` argument for resume-from-cursor assertions. */ function stubReadRecordsWithChunks(chunks: unknown[]) { const records = chunks.map((chunk, i) => ({ - data: typeof chunk === "string" ? chunk : JSON.stringify(chunk), + data: chunk, id: `evt-${i + 1}`, seqNum: i + 1, })); @@ -228,30 +229,12 @@ describe("replaySessionOutTail", () => { expect(text).toBe("fully-finished"); }); - it("JSON-decodes each record.data (every record arrives pre-serialized)", async () => { - // The records endpoint hands each chunk back as a JSON string in - // `record.data` — the agent JSON.parses it client-side so the - // server's hot path doesn't pay the parse cost. Verify a normal - // turn round-trips through JSON encode→decode. - const stringChunks = textTurn("a-1", "from-string").map((c) => JSON.stringify(c)); - stubReadRecordsWithChunks(stringChunks); - - const result = await replaySessionOutTail("string-chunks"); - expect(result).toHaveLength(1); - const text = (result[0]!.parts as Array<{ type: string; text?: string }>) - .filter((p) => p.type === "text") - .map((p) => p.text) - .join(""); - expect(text).toBe("from-string"); - }); - - it("skips records whose data is unparseable JSON", async () => { - // The replay helper wraps the per-record JSON.parse in try/catch so - // a single malformed record can't sink the rest of the replay. The - // server should never serve a malformed `data`, but the defensive - // catch lets a poisoned record skip cleanly. + it("skips records whose data is a string (the wire delivers objects)", async () => { + // The writer puts chunk objects directly into the record envelope; + // the route forwards them as-is. A string body is malformed — the + // consumer drops it defensively rather than JSON.parsing. stubReadRecordsWithChunks([ - "not-json-{[", + "not-an-object", ...textTurn("a-1", "survived"), ]); @@ -260,14 +243,14 @@ describe("replaySessionOutTail", () => { expect(result[0]!.id).toBe("a-1"); }); - it("skips records whose decoded data is not an object", async () => { - // After JSON.parse, the helper requires `chunk` to be a non-null - // object with a string `type` field. Records that decode to - // primitives (number, string, etc.) are dropped silently. + it("skips records whose data is not an object", async () => { + // The consumer requires `chunk` to be a non-null object with a + // string `type` field. Records that arrive as primitives + // (number, null, string) are dropped silently. stubReadRecordsWithChunks([ - JSON.stringify(42), - JSON.stringify(null), - JSON.stringify("just-a-string"), + 42, + null, + "just-a-string", ...textTurn("a-1", "survived"), ]); diff --git a/references/ai-chat/src/trigger/chat.ts b/references/ai-chat/src/trigger/chat.ts index 69315a76e8..679a4b6d8a 100644 --- a/references/ai-chat/src/trigger/chat.ts +++ b/references/ai-chat/src/trigger/chat.ts @@ -296,6 +296,34 @@ export const aiChat = chat }, // #endregion + // #region onRecoveryBoot — emit a data-chat-recovery banner chunk + onRecoveryBoot: async ({ + chatId, + previousRunId, + cause, + settledMessages, + inFlightUsers, + partialAssistant, + pendingToolCalls, + writer, + }) => { + logger.info("onRecoveryBoot fired", { + chatId, + previousRunId, + cause, + settledCount: settledMessages.length, + inFlightUserCount: inFlightUsers.length, + partialAssistantPresent: partialAssistant !== undefined, + pendingToolCallCount: pendingToolCalls.length, + }); + writer.write({ + type: "data-chat-recovery", + data: { cause, previousRunId, partialPresent: partialAssistant !== undefined }, + transient: true, + }); + }, + // #endregion + // #region onPreload — eagerly create chat/session DB rows before the first message onPreload: async ({ chatId, chatAccessToken, clientData }) => { if (!clientData) return;