fix: add provider/model failover to streaming LLM calls#2022
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughReliableProvider::stream_chat_with_system now performs provider+model streaming failover: it pre-creates candidate streams, spawns a task that peeks the first chunk to decide commit vs retry (with exponential backoff using is_stream_error_non_retryable), forwards committed chunks via an mpsc channel, and returns a BoxStream built from the receiver. ChangesProvider/model failover for streaming requests
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/inference/provider/reliable.rs`:
- Around line 996-1011: The current branch converts the StreamError to a string
and wraps it in anyhow, which loses type information and prevents
is_non_retryable from downcasting; instead, preserve and inspect the original
StreamError: in the Some(Err(ref e)) arm of the stream handling, match on
StreamError's variants (or add/ call a getter like StreamError::source_error()
or StreamError::is_non_retryable()) to extract the inner anyhow::Error or
underlying reqwest::Error and pass that to is_non_retryable (or call the new
StreamError::is_non_retryable helper), while keeping the tracing::warn message
(use e.to_string() only for the log text). Update the branch around
provider_name/current_model to use the original error value for retry
classification rather than the stringified error.
- Around line 980-1023: The streaming failover loop over candidate_streams only
tries each (provider_name, current_model, candidate_stream) once, so transient
errors immediately fail over; add an inner retry loop (using max_retries and
backoff_ms similar to the non-streaming path) that attempts to re-open/advance
the same candidate stream up to max_retries before moving to the next candidate;
on rate-limit/rotate-worthy errors call rotate_key(), record each failure with
push_failure (and use format_failure_aggregate when sending the aggregated error
downstream), honor is_non_retryable to skip retries, and keep sending successful
chunks to tx as currently implemented; ensure backoff_ms is reset/managed
per-candidate and that failure aggregation is attached to the final error sent
when all retries for a candidate are exhausted.
- Around line 961-973: The current code eagerly builds candidate_streams by
calling stream_chat_with_system for every provider/model pair (using
candidate_streams, stream_chat_with_system, model_chain, streaming_providers),
which opens connections up-front; instead, change to collect a Vec of
lightweight candidates (e.g., tuples of provider_name, model, and an
Arc/cloneable reference to the provider) and move the call to
stream_chat_with_system into the spawned failover task where you attempt each
candidate in order, creating the stream only when you start that attempt so
unused providers are never contacted.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e987385b-e313-4996-b980-669ff048dbdf
📒 Files selected for processing (1)
src/openhuman/inference/provider/reliable.rs
| let mut candidate_streams: Vec<(String, String, stream::BoxStream<'static, StreamResult<StreamChunk>>)> = Vec::new(); | ||
| for current_model in &model_chain { | ||
| for (provider_name, provider) in &streaming_providers { | ||
| let s = provider.stream_chat_with_system( | ||
| system_prompt, | ||
| message, | ||
| current_model, | ||
| temperature, | ||
| options, | ||
| ); | ||
| candidate_streams.push(((*provider_name).clone(), current_model.clone(), s)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Pre-creating all candidate streams initiates connections to all providers immediately.
Calling stream_chat_with_system on every provider×model combination upfront may open HTTP connections to all providers before any failover is needed. If the first candidate succeeds, the remaining connections are wasted and may unnecessarily consume rate-limit budgets.
Consider lazily creating streams inside the spawned task. One approach: pass a Vec of (provider_name, model, Arc<dyn Provider>) into the task and call stream_chat_with_system only when attempting that candidate.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/inference/provider/reliable.rs` around lines 961 - 973, The
current code eagerly builds candidate_streams by calling stream_chat_with_system
for every provider/model pair (using candidate_streams, stream_chat_with_system,
model_chain, streaming_providers), which opens connections up-front; instead,
change to collect a Vec of lightweight candidates (e.g., tuples of
provider_name, model, and an Arc/cloneable reference to the provider) and move
the call to stream_chat_with_system into the spawned failover task where you
attempt each candidate in order, creating the stream only when you start that
attempt so unused providers are never contacted.
stream_chat_with_system only tried the first streaming-capable provider with the first model. Transient errors propagated immediately while non-streaming methods had full retry + failover. Now iterates all provider+model candidates with exponential backoff between transient failures, matching non-streaming reliability behavior. Closes tinyhumansai#1931
d2fd505 to
9008933
Compare
Summary
stream_chat_with_system()inReliableProvideronly tried the first streaming-capable provider with the first model in the chain. Any transient error (rate limit, timeout, 503) propagated immediately — while non-streaming methods (chat_with_system,chat,chat_with_tools) had full retry + provider failover + model fallback.Changes
StreamError::Providerwith clear messageTesting
reliable::testspassproviderstests passcargo checkclean (no new warnings)Closes #1931
Summary by CodeRabbit