feat(langchain): AgentStreamer for create_agent graphs#36897
feat(langchain): AgentStreamer for create_agent graphs#36897Nick Hollon (nick-hollon-lc) wants to merge 37 commits into
Conversation
Add `BaseChatModel.stream_v2()` / `astream_v2()` returning a `ChatModelStream` with typed projections (`.text`, `.reasoning`, `.tool_calls`, `.usage`, `.output`) plus raw protocol event iteration. Providers that only implement `_stream()` get a compat bridge that converts `AIMessageChunk`s to the content-block protocol lifecycle, preserving usage and response metadata for v1 parity. - New module `chat_model_stream.py` with `ChatModelStream`, `AsyncChatModelStream`, and push/pull projection hierarchy (`SyncProjection`, `SyncTextProjection`, `AsyncProjection`). - New module `_compat_bridge.py` that converts chunk streams to protocol events, with `response_metadata` preserved via `MessageStartData.metadata` and `MessageFinishData.metadata`. - `stream_v2` wires `on_chat_model_start` / `on_llm_end` / `on_llm_error` callbacks into the pump; `astream_v2` spawns a producer task and awaits it alongside the output so `on_llm_end` fires before `await stream` returns. - tool_use finish-reason inference runs after finalization so malformed tool-call JSON (finalized as `invalid_tool_call`) does not flip `finish_reason` to `"tool_use"`. - Add `langchain-protocol>=0.0.6` dependency (local path override retained for dev). Tests cover projection semantics, tool-call streaming (single + parallel + malformed args), async/sync event replay, callback firing, and v1 parity (text, tool calls, usage, response metadata, reasoning+text ordering, error propagation).
Add explicit `stream_v2` / `astream_v2` overrides on `RunnableBinding` that merge `self.kwargs` into the delegated call, mirroring the existing `stream` / `astream` / `invoke` overrides. Without these, calls that chained through `bind` or `bind_tools` fell through `__getattr__` (which merges `self.config` but not `self.kwargs`) and silently dropped bound tools, stop sequences, and other runtime kwargs. The returns are typed as `Any` to avoid pulling chat-model types into `langchain_core.runnables.base`; the method only makes sense when the bound runnable is a chat model, and `AttributeError` propagates unchanged if it isn't. Adds tests covering bound-kwarg forwarding for both sync and async paths plus the call-time kwarg override semantics.
Drop the local path override in `[tool.uv.sources]` now that 0.0.8 is published, and raise the lower bound to match the APIs the compat bridge relies on (notably `MessageFinishData.metadata`).
- `stream_v2` / `astream_v2` now pass the assembled `AIMessage` to `on_llm_end` via `LLMResult(generations=[[ChatGeneration(message=...)]])`, so LangSmith and other tracers see the final response on v2 calls (was previously `generations=[]`). - `astream_v2`'s producer re-raises `asyncio.CancelledError` ahead of the generic handler, so cancellation propagates normally instead of being converted into `on_llm_error` + a swallowed exception. - New `message_to_events` / `amessage_to_events` in `_compat_bridge` replay a finalized `AIMessage` as a synthetic content-block lifecycle. Intended for the langgraph-side handler that emits protocol events for non-streamed node outputs (cache hits, `model.invoke()` inside a node, checkpointed state). Turns `_extract_final_blocks` from a dangling helper into a real caller. - Document the optional `_stream_chat_model_events` / `_astream_chat_model_events` provider hooks inline at the getattr sites so integrators can discover the expected signature.
Adds a new `on_stream_event` hook on `LLMManagerMixin` / `AsyncCallbackHandler` that fires once per `MessagesData` event produced by `stream_v2` / `astream_v2`, with dispatch methods on `CallbackManagerForLLMRun` and `AsyncCallbackManagerForLLMRun`. This is v2's observer hook, analogous to `on_llm_new_token` in v1 but at event granularity rather than chunk. It fires uniformly whether the provider emits events natively via `_stream_chat_model_events` or goes through the chunk-to-event compat bridge — observers see the same event stream regardless of how the underlying model produces output. Primary consumer: langgraph's forthcoming `StreamProtocolMessagesHandler`, which can now be a one-line forwarder (lookup namespace metadata by run_id, push `(ns, "messages", (event, meta))` to the graph's output stream) instead of re-implementing the chunks-to-events state machine internally. Does not fire from v1 `stream()` / `astream()`. Purely additive — `on_chat_model_start`, `on_llm_end`, and `on_llm_error` continue to bracket a v2 call as they do a v1 call.
Replaces the list-of-futures + `_wake()` pattern with a single `asyncio.Event` shared by all waiters (the awaitable plus every async iterator cursor). Each waiter clears the event before awaiting and re-checks its own condition on wake, so stale notifications don't cause spin loops. Single-loop only — if cross-thread wake is ever required, revert to the list-of-futures pattern with `call_soon_threadsafe`. Noted in the AsyncProjection docstring. Net -9 lines; drops `import contextlib` and the per-iteration `create_future`/`append` boilerplate.
Renames the stream's and projections' "private" producer-side methods to public names, since they are the intended call surface for anyone driving the stream (the pump, langgraph's forthcoming handler, tests). Removes ~36 `noqa: SLF001` suppressions along the way. On `_ProjectionBase`: - `_push` -> `push` - `_finish` -> `complete` - `_fail` -> `fail` - adds `done` / `error` read-only properties for sidekicks (iterator) - `SyncProjection.set_request_more(cb)` replaces direct `_request_more` assignment On `ChatModelStream`: - `_bind_pump` -> `bind_pump` - `_fail` -> `fail` - adds `output_message` property (non-blocking peek) - new `dispatch(event)` method replaces the module-level `dispatch_event` helper (kept as a thin deprecated wrapper for back-compat) The genuinely internal helpers (`_record_event`, `_push_*`, `_finish` on the stream, `_drain`, `_assemble_message`) stay private — they have one caller each, inside the class. Remaining SLF001 suppressions in this file are intentional `_AsyncProjectionIterator` coupling to its projection's `_deltas` and `_event`; annotated with a comment.
The compat bridge produces InvalidToolCallBlock when tool-call JSON parse
fails, but ChatModelStream had no handler for it. The finish event was
silently ignored, the stale chunk stayed in _tool_call_chunks, and
_finish's sweep re-parsed (failed again), fell back to args={}, and
appended a valid-looking ToolCallBlock — so the protocol said "invalid"
while the assembled AIMessage said "valid with empty args". An agent layer
downstream could then dispatch the malformed call.
The finish handler now routes invalid_tool_call blocks into
_invalid_tool_calls_acc and deletes the stale chunk entry; _finish's sweep
emits InvalidToolCallBlock on JSON failure instead of an empty-args tool
call; _assemble_message passes invalid_tool_calls through to AIMessage.
Extend the v2 stream and compat bridge to handle every protocol ContentBlock variant end-to-end — server tool calls, invalid tool calls, images, audio, video, file, and non-standard blocks — not just text, reasoning, and regular tool calls. Previously these were silently dropped at the bridge's extractor, had no handler in ChatModelStream, and could not appear in .output.content. The stream now keeps an index-ordered `_blocks` snapshot as the single source of truth for .output.content, alongside the existing typed accumulators that drive the public projections. `_assemble_message` builds content from that snapshot, emitting protocol-shape `tool_call` blocks instead of the legacy `tool_use` shape, and collapses to a bare string only when the message contains exactly one text block. Bridge extractors (_extract_blocks_from_chunk, _extract_final_blocks) now pass through any protocol-shape block in msg.content, _accumulate_block and _delta_block handle server_tool_call_chunk and self-contained types, and _finalize_block promotes server_tool_call_chunk to server_tool_call (falling back to invalid_tool_call on JSON failure, symmetric with regular tool calls). The standard `invalid_tool_calls` field on AIMessage is also surfaced by the final-block extractor. Forward-looking: today's partners keep provider-native shapes in msg.content and expose protocol blocks lazily via the `.content_blocks` property, so these paths are latent until partners either populate msg.content with protocol shape or override _stream_chat_model_events. The bridge is ready.
Collapse _compat_bridge to a single path that reads msg.content_blocks and emits protocol events. The translator / best-effort / tool_call_chunks extraction all live in content_blocks already — the legacy branch, _PROTOCOL_PASS_THROUGH_TYPES, _SELF_CONTAINED_BLOCK_TYPES skeleton handling, and manual reasoning-variant sniffing were duplicating work. Side fixes picked up along the way: - No-provider chunks with both text content and tool_call_chunks silently dropped the tool call because the legacy extractor put both at index 0. content_blocks places them on distinct indices. - "server_tool_call_result" (typo) replaced with "server_tool_result" in ChatModelStream's finish dispatch and the test that exercises it — matches the protocol type that every translator actually emits. Also collapses duplicated tool_call_chunk / server_tool_call_chunk handling in chat_model_stream into shared merge/sweep helpers so the two code paths can't drift apart again (which is how the typo survived). _compat_bridge.py: 855 -> 581 lines. No public API changes.
Reduce the cast count in _compat_bridge from 9 to 2. The casts exist because langchain_core.messages.content.ContentBlock and langchain_protocol.protocol.ContentBlock are two nominally distinct TypedDict Unions that are structurally near-identical. msg.content_blocks returns the core Union; event payloads want the protocol Union; the bridge launders between them through dict[str, Any]. - Remove redundant casts (isinstance-narrowed dict; getattr Any). - Use TypedDict constructors (ServerToolCallChunkBlock, ToolCallBlock, ServerToolCallBlock) where we build fresh blocks — no cast needed for constructor output. - Introduce _to_protocol_block and _to_finalized_block helpers that each hold a single cast with a docstring explaining the seam and pointing at the cross-module refactor that would retire them. CompatBlock's docstring now explains the laundering role.
…ckHandler Adds `_V2StreamingCallbackHandler`, a marker class in `tracers/_streaming.py` that handlers can inherit to signal they consume `on_stream_event` rather than `on_llm_new_token`. Extracts the shared event-producing logic from `stream_v2` / `astream_v2` into `_iter_v2_events` / `_aiter_v2_events` helpers, which pick the native `_stream_chat_model_events` hook or fall back to `chunks_to_events` bridged from `_stream`. `BaseChatModel.invoke` / `ainvoke` now route through the v2 event generator when any attached handler inherits the marker: `_generate_with_cache` / `_agenerate_with_cache` gain a v2 branch, parallel to the existing v1 streaming branch, that drains the helper into a `ChatModelStream` and wraps the assembled `AIMessage` as a `ChatResult`. Caching, rate limiting, run lifecycle, and `llm_output` merging stay on the existing generate path — the v2 and v1 branches diverge only on which callback fires per chunk. The marker is a concrete class rather than a `runtime_checkable` `Protocol` on purpose: an empty Protocol matches every object and would misroute every call.
Under caller-driven async streaming, `AsyncChatModelStream` projections deadlocked when iterated inside an outer `async for stream in run.messages` loop: the projection's `asyncio.Event` was only set by external dispatch, but no task was driving the pump while the consumer was suspended in the inner iteration. Mirror the sync `Projection._request_more` path on the async side: - `AsyncProjection.set_arequest_more` stores an async pull callback. - `_AsyncProjectionIterator.__anext__` drains the callback in an inner loop when wired, falling back to the event wait otherwise. - `_await_impl` drives the callback too so `await stream.output` and `await stream.usage` advance the producer. - `AsyncChatModelStream.set_arequest_more` fans the callback out to every projection so langgraph's `AsyncGraphRunStream` can wire it on stream construction via a transformer `_bind_apump` hook. Pump-exhaustion-without-completion ends iteration cleanly rather than hanging — matches the pragmatic contract for graphs that exhaust mid-stream.
_assemble_message builds AIMessage content from v1 protocol blocks (tool calls typed "tool_call"). Without the output_version marker, provider request builders that gate v1->provider translation on that flag (e.g. ChatAnthropic._get_request_payload) pass the v1 blocks through unconverted and the API rejects them.
Adds `AgentStreamer` as the `create_agent` entry point for the content-block streaming API. Pre-registers `ToolCallTransformer` so every run exposes `run.tool_calls` without opt-in. `AgentRunStream` / `AsyncAgentRunStream` subclasses exist for `isinstance` checks and as the extension point for downstream streamers (e.g. a deepagents layer).
…treamer `create_agent` now passes `transformers=[ToolCallTransformer]` straight through to `graph.compile`, so callers drive the transformer pipeline with `agent.stream_v2()` / `agent.astream_v2()` on the compiled graph. The `AgentStreamer` / `AgentRunStream` / `AsyncAgentRunStream` wrapper classes are gone — projection attributes still bind on `GraphRunStream` by key when the matching transformer is registered.
…ntric-streaming # Conflicts: # libs/langchain/uv.lock # libs/partners/huggingface/uv.lock
Earlier lock regens picked up editable langgraph paths from local dev setup, inflating langgraph to 1.1.7a2 in openai/model-profiles/langchain locks. Rebase against master's baseline via uv lock --no-sources-package for langgraph* so the only diff vs master is the langchain-protocol addition from core.
Streams where content_blocks carries string index identifiers (e.g. OpenAI responses/v1 mode with 'lc_rs_305f30', 'lc_txt_1') collapsed all blocks to wire index 0 because _iter_protocol_blocks fell back to positional i for non-int indices. Every block appeared as a delta of block 0, and only one content-block-finish event fired at the end of the stream. Keep the raw block key (int or string) internally, allocate sequential uint wire indices per distinct block, and finish the previously-open block when a new block key appears — matching the protocol's no-interleave rule.
New `langchain_tests.utils.stream_lifecycle.assert_valid_event_stream` helper enforces the protocol contract on any event stream: - single message-start / message-finish envelope - blocks do not interleave (each block finishes before the next starts) - sequential uint wire indices from 0 - accumulated deltas match the finish payload for deltaable types Applied at three levels: - core/test_compat_bridge: provider-style emission patterns exercised directly through chunks_to_events / message_to_events (openai chat completions int indices, openai responses/v1 string identifiers, anthropic-style per-chunk int indices, inline image, invalid tool call, empty stream) - openai partner: validator applied to stream_v2 against the existing responses-api mock and to a new chat-completions stream_v2 test - anthropic partner: new mock stream of RawMessageStartEvent + RawContentBlock* events threaded through _stream via `_create` patch; covers thinking + text + tool_use lifecycle with tool-use stop_reason Enabling thinking on the anthropic test flips coerce_content_to_string off so every block carries a proper integer index — the structured path the bridge actually exercises. Default-mode (no tools / thinking / docs) coerces text to a plain string and strips per-chunk indices; the bridge handles that branch by collapsing to positional-0 and it is a known separate code path, intentionally not covered here.
When a provider emits content_blocks without an `index` field (e.g. anthropic `_stream` with coerce_content_to_string=True for pure text), the bridge's positional-0 fallback merges successive chunks into one block. This works correctly today because the only coerced-string path in the anthropic integration is mutually exclusive with any structured (indexed) emission. Document the scenario that would surface the gap and the two ways to close it (native _stream_chat_model_events hook or 'continue on anonymous key' bridge rule) so a future integration doesn't discover the edge case the hard way.
Anthropic's thinking stream emits a `signature_delta` after the
reasoning text finishes. The adapter surfaces this as a reasoning
delta carrying `extras.signature` (and no new text). Two places
were dropping those fields while assembling the accumulated block:
- `_compat_bridge._accumulate` only concatenated the `reasoning`
text, silently discarding any other keys (including `extras`) on
later deltas.
- `chat_model_stream._push_content_block_finish` rebuilt the
finalized reasoning block as `{"type": "reasoning", "reasoning": ...}`,
dropping everything the finish event carried.
Together, these stripped Claude's `extras.signature` from the
assembled `AIMessage`, and the next turn in a `create_agent` loop
failed with `messages.<n>.content.<k>.thinking.signature: Field
required`.
The bridge now merges `extras` (so earlier keys survive later
deltas) and replaces other non-text fields; `ChatModelStream`
spreads the incoming finish block before overwriting the two
fields it owns.
Covered by the new
`test_lifecycle_validator_anthropic_reasoning_preserves_signature`
case.
The base branch was changed.
853af9f to
aeb5612
Compare
# Conflicts: # libs/core/langchain_core/language_models/_compat_bridge.py # libs/core/langchain_core/language_models/chat_model_stream.py # libs/core/langchain_core/language_models/chat_models.py # libs/core/pyproject.toml # libs/core/tests/unit_tests/language_models/test_chat_model_stream.py # libs/core/tests/unit_tests/language_models/test_chat_model_streamer.py # libs/core/tests/unit_tests/language_models/test_compat_bridge.py # libs/core/tests/unit_tests/language_models/test_stream_v2.py # libs/core/tests/unit_tests/language_models/test_v1_parity.py # libs/core/uv.lock # libs/langchain/uv.lock # libs/langchain_v1/langchain/agents/factory.py # libs/langchain_v1/uv.lock # libs/model-profiles/uv.lock # libs/partners/anthropic/tests/unit_tests/test_chat_models.py # libs/partners/anthropic/uv.lock # libs/partners/chroma/uv.lock # libs/partners/deepseek/uv.lock # libs/partners/exa/uv.lock # libs/partners/fireworks/uv.lock # libs/partners/groq/uv.lock # libs/partners/huggingface/uv.lock # libs/partners/mistralai/uv.lock # libs/partners/nomic/uv.lock # libs/partners/ollama/uv.lock # libs/partners/openai/uv.lock # libs/partners/openrouter/uv.lock # libs/partners/perplexity/uv.lock # libs/partners/qdrant/uv.lock # libs/partners/xai/uv.lock # libs/standard-tests/uv.lock # libs/text-splitters/uv.lock
Merging this PR will not alter performance
Comparing Footnotes
|
|
Hi, Severity: action required | Category: maintainability How to fix: Remove duplicate method definitions Agent prompt to fix - you can give this to your LLM of choice:
We noticed a couple of other issues in this PR as well - happy to share if helpful. Found by Qodo code review |
Summary
AgentStreameras thecreate_agententry point for the content-block streaming API.ToolCallTransformerso every agent run exposesrun.tool_callswithout the caller opting in.AgentRunStream/AsyncAgentRunStreamsubclasses forisinstancechecks and as the extension point for downstream streamers (e.g. a deepagents-layerDeepAgentRunStream).Targeted at
nh/content-block-centric-streaming— it depends on the v2 content-block streaming work on that branch.