Skip to main content

gestura_core_streaming/
streaming.rs

1//! Streaming LLM provider support for Gestura
2//!
3//! This module provides streaming capabilities for LLM responses, enabling
4//! real-time token-by-token delivery to the frontend with cancellation support.
5
6use crate::config::StreamingConfig;
7use futures_util::StreamExt;
8use gestura_core_foundation::AppError;
9use gestura_core_llm::TokenUsage;
10use gestura_core_llm::openai::{
11    OpenAiApi, is_openai_model_incompatible_with_agent_session, openai_agent_session_model_message,
12    openai_api_for_model,
13};
14use gestura_core_retry::RetryPolicy;
15use gestura_core_tools::schemas::ProviderToolSchemas;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU8, Ordering};
19use std::time::Duration;
20use tokio::sync::mpsc;
21use tracing::Instrument as _;
22
23/// Default timeout for streaming LLM API calls
24const STREAMING_TIMEOUT_SECS: u64 = 300;
25const STREAM_CHUNK_BUFFER_CAPACITY: usize = 256;
26const STATUS_CHUNK_SEND_TIMEOUT: Duration = Duration::from_millis(100);
27const TOKEN_USAGE_CHUNK_SEND_TIMEOUT: Duration = Duration::from_millis(100);
28
29async fn send_status_chunk_best_effort(tx: &mpsc::Sender<StreamChunk>, chunk: StreamChunk) {
30    debug_assert!(matches!(chunk, StreamChunk::Status { .. }));
31
32    match tokio::time::timeout(STATUS_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
33        Ok(Ok(())) | Ok(Err(_)) => {}
34        Err(_) => {
35            tracing::debug!(
36                timeout_ms = STATUS_CHUNK_SEND_TIMEOUT.as_millis(),
37                "Dropping transient status chunk because the stream receiver is not draining fast enough"
38            );
39        }
40    }
41}
42
43async fn send_token_usage_chunk_best_effort(tx: &mpsc::Sender<StreamChunk>, chunk: StreamChunk) {
44    debug_assert!(matches!(chunk, StreamChunk::TokenUsageUpdate { .. }));
45
46    match tokio::time::timeout(TOKEN_USAGE_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
47        Ok(Ok(())) | Ok(Err(_)) => {}
48        Err(_) => {
49            tracing::debug!(
50                timeout_ms = TOKEN_USAGE_CHUNK_SEND_TIMEOUT.as_millis(),
51                "Dropping transient token-usage chunk because the stream receiver is not draining fast enough"
52            );
53        }
54    }
55}
56
57/// Pricing per 1M tokens (input/output) for various providers
58/// Prices are in USD and updated as of January 2026
59pub mod pricing {
60    /// OpenAI GPT-4 Turbo pricing (per 1M tokens)
61    pub const OPENAI_GPT4_TURBO_INPUT: f64 = 10.0;
62    pub const OPENAI_GPT4_TURBO_OUTPUT: f64 = 30.0;
63
64    /// OpenAI GPT-3.5 Turbo pricing (per 1M tokens)
65    pub const OPENAI_GPT35_TURBO_INPUT: f64 = 0.50;
66    pub const OPENAI_GPT35_TURBO_OUTPUT: f64 = 1.50;
67
68    /// Anthropic Claude 3.5 Sonnet pricing (per 1M tokens)
69    pub const ANTHROPIC_CLAUDE_35_SONNET_INPUT: f64 = 3.0;
70    pub const ANTHROPIC_CLAUDE_35_SONNET_OUTPUT: f64 = 15.0;
71
72    /// Anthropic Claude 3 Opus pricing (per 1M tokens)
73    pub const ANTHROPIC_CLAUDE_3_OPUS_INPUT: f64 = 15.0;
74    pub const ANTHROPIC_CLAUDE_3_OPUS_OUTPUT: f64 = 75.0;
75
76    /// Anthropic Claude 3 Haiku pricing (per 1M tokens)
77    pub const ANTHROPIC_CLAUDE_3_HAIKU_INPUT: f64 = 0.25;
78    pub const ANTHROPIC_CLAUDE_3_HAIKU_OUTPUT: f64 = 1.25;
79
80    /// Google Gemini 2.0 Flash pricing (per 1M tokens)
81    pub const GEMINI_20_FLASH_INPUT: f64 = 0.10;
82    pub const GEMINI_20_FLASH_OUTPUT: f64 = 0.40;
83
84    /// Google Gemini 2.0 Flash-Lite pricing (per 1M tokens)
85    pub const GEMINI_20_FLASH_LITE_INPUT: f64 = 0.075;
86    pub const GEMINI_20_FLASH_LITE_OUTPUT: f64 = 0.30;
87
88    /// Google Gemini 1.5 Pro pricing (per 1M tokens)
89    pub const GEMINI_15_PRO_INPUT: f64 = 1.25;
90    pub const GEMINI_15_PRO_OUTPUT: f64 = 5.00;
91
92    /// Google Gemini 1.5 Flash pricing (per 1M tokens)
93    pub const GEMINI_15_FLASH_INPUT: f64 = 0.075;
94    pub const GEMINI_15_FLASH_OUTPUT: f64 = 0.30;
95
96    /// xAI Grok pricing (per 1M tokens) - estimated
97    pub const XAI_GROK_INPUT: f64 = 5.0;
98    pub const XAI_GROK_OUTPUT: f64 = 15.0;
99
100    /// Ollama (local) - free
101    pub const OLLAMA_INPUT: f64 = 0.0;
102    pub const OLLAMA_OUTPUT: f64 = 0.0;
103
104    /// Default fallback pricing (per 1M tokens)
105    pub const DEFAULT_INPUT: f64 = 1.0;
106    pub const DEFAULT_OUTPUT: f64 = 3.0;
107}
108
109/// Token usage status indicator for visual feedback
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum TokenUsageStatus {
112    /// Green: Healthy usage (<70% of limit)
113    Green,
114    /// Yellow: Approaching limit (70-90% of limit)
115    Yellow,
116    /// Red: Near or exceeding limit (>90% of limit)
117    Red,
118}
119
120/// Which output stream a shell chunk originated from.
121#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
122#[serde(rename_all = "lowercase")]
123pub enum ShellOutputStream {
124    /// Standard output
125    Stdout,
126    /// Standard error
127    Stderr,
128}
129
130/// Lifecycle state of a shell process.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
132#[serde(rename_all = "snake_case")]
133pub enum ShellProcessState {
134    /// Process has been spawned and is running
135    Started,
136    /// Process completed normally
137    Completed,
138    /// Process failed (non-zero exit or spawn error)
139    Failed,
140    /// Process was stopped by user request (SIGTERM/SIGKILL)
141    Stopped,
142    /// Process was paused by user request (SIGSTOP)
143    Paused,
144    /// Process was resumed after pause (SIGCONT)
145    Resumed,
146}
147
148/// Lifecycle state of a long-lived interactive shell session.
149#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
150#[serde(rename_all = "snake_case")]
151pub enum ShellSessionState {
152    /// PTY shell process is starting.
153    Starting,
154    /// PTY shell is alive and available for reuse.
155    Idle,
156    /// PTY shell currently has an active command lease.
157    Busy,
158    /// PTY shell is attempting to interrupt the active foreground job.
159    Interrupting,
160    /// PTY shell is shutting down.
161    Stopping,
162    /// PTY shell was stopped intentionally.
163    Stopped,
164    /// PTY shell terminated unexpectedly or became unusable.
165    Failed,
166}
167
168/// Compact task view for runtime-authored task-state updates.
169#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
170pub struct TaskRuntimeTaskView {
171    /// Stable task identifier.
172    pub id: String,
173    /// Human-readable task name.
174    pub name: String,
175    /// Runtime task status string.
176    pub status: String,
177}
178
179/// Runtime-authored task scheduler snapshot streamed to UI surfaces.
180#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
181pub struct TaskRuntimeSnapshot {
182    /// Root task driving the current run.
183    pub root_task_id: String,
184    /// Current runtime-selected task, if any.
185    pub current_task: Option<TaskRuntimeTaskView>,
186    /// Ready tasks the runtime deems actionable now.
187    #[serde(default, skip_serializing_if = "Vec::is_empty")]
188    pub ready_tasks: Vec<TaskRuntimeTaskView>,
189    /// Tasks the runtime considers safe to batch in parallel.
190    #[serde(default, skip_serializing_if = "Vec::is_empty")]
191    pub parallel_ready_tasks: Vec<TaskRuntimeTaskView>,
192    /// Tasks currently blocked by dependencies or parent ordering.
193    #[serde(default, skip_serializing_if = "Vec::is_empty")]
194    pub blocked_tasks: Vec<TaskRuntimeTaskView>,
195    /// Open tasks that are not yet terminal.
196    #[serde(default, skip_serializing_if = "Vec::is_empty")]
197    pub open_tasks: Vec<TaskRuntimeTaskView>,
198    /// Recently completed tasks.
199    #[serde(default, skip_serializing_if = "Vec::is_empty")]
200    pub completed_tasks: Vec<TaskRuntimeTaskView>,
201    /// Runtime-detected missing completion requirements.
202    #[serde(default, skip_serializing_if = "Vec::is_empty")]
203    pub missing_requirements: Vec<String>,
204    /// Human-readable scheduler summary.
205    pub status_message: String,
206}
207
208/// Public-facing narration stage for brief between-tool updates.
209#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum NarrationStage {
212    /// Gathering local or external context before the next step.
213    Context,
214    /// Planning or scoping the next action.
215    Planning,
216    /// Executing the primary requested work.
217    Execution,
218    /// Verifying or validating the result.
219    Verification,
220    /// Waiting on a blocker or missing requirement.
221    Blocked,
222    /// General progress update.
223    Progress,
224}
225
226impl NarrationStage {
227    /// Return the stable snake_case label used by the UI.
228    pub const fn as_str(&self) -> &'static str {
229        match self {
230            Self::Context => "context",
231            Self::Planning => "planning",
232            Self::Execution => "execution",
233            Self::Verification => "verification",
234            Self::Blocked => "blocked",
235            Self::Progress => "progress",
236        }
237    }
238}
239
240/// Structured public narration content rendered between major loop events.
241#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
242pub struct PublicNarration {
243    /// Short collapsed heading for the narration block.
244    pub title: String,
245    /// Natural prose fallback used by plain-text surfaces.
246    pub message: String,
247    /// Concise statement of what changed or what the agent is doing now.
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub summary: Option<String>,
250    /// Why the current step matters or why it was chosen.
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    pub reason: Option<String>,
253    /// What the agent expects to do immediately after this point.
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub next_step: Option<String>,
256    /// Short evidence bullets grounding the narration in observed runtime facts.
257    #[serde(default, skip_serializing_if = "Vec::is_empty")]
258    pub evidence: Vec<String>,
259}
260
261/// A chunk of streaming response
262#[derive(Debug, Clone)]
263pub enum StreamChunk {
264    /// Content from the model's thinking process
265    Thinking(String),
266    /// Public-facing narration explaining the current direction.
267    Narration {
268        /// Structured public narration content for user-facing progress updates.
269        narration: PublicNarration,
270        stage: NarrationStage,
271    },
272    /// A text chunk from the LLM
273    Text(String),
274    /// Start of a tool call
275    ToolCallStart { id: String, name: String },
276    /// Arguments delta for the current tool call
277    ToolCallArgs(String),
278    /// End of the current tool call (LLM finished specifying the call)
279    ToolCallEnd,
280    /// Result of tool execution with status and output
281    ToolCallResult {
282        /// Tool name
283        name: String,
284        /// Whether the tool succeeded
285        success: bool,
286        /// Output or error message
287        output: String,
288        /// Execution duration in milliseconds
289        duration_ms: u64,
290    },
291    /// Retry attempt notification for user feedback
292    RetryAttempt {
293        /// Current attempt number (1-indexed)
294        attempt: u32,
295        /// Maximum attempts configured
296        max_attempts: u32,
297        /// Delay before next retry in milliseconds
298        delay_ms: u64,
299        /// Error that triggered the retry
300        error_message: String,
301    },
302    /// Context compaction notification for user feedback
303    ContextCompacted {
304        /// Number of messages before compaction
305        messages_before: usize,
306        /// Number of messages after compaction
307        messages_after: usize,
308        /// Tokens saved by compaction
309        tokens_saved: usize,
310        /// Summary of what was compacted
311        summary: String,
312    },
313    /// Token usage notification for user feedback
314    /// Provides real-time visibility into context window utilization
315    TokenUsageUpdate {
316        /// Estimated tokens in current request
317        estimated: usize,
318        /// Maximum input tokens allowed
319        limit: usize,
320        /// Utilization percentage (0-100)
321        percentage: u8,
322        /// Status indicator: Green (<70%), Yellow (70-90%), Red (>90%)
323        status: TokenUsageStatus,
324        /// Estimated cost in USD for this request (input only)
325        estimated_cost: f64,
326    },
327
328    /// A user-facing status message intended for UIs.
329    ///
330    /// This is for short, transient notifications that should not count as
331    /// streamed "output" (i.e., it must not prevent retry when a provider
332    /// attempt fails before any actual response content is forwarded).
333    Status {
334        /// Human-readable status message.
335        message: String,
336    },
337    /// A request from the agent to change configuration.
338    ///
339    /// This is surfaced to UIs (GUI/TUI) so they can prompt for confirmation or
340    /// apply changes immediately in permissive mode.
341    ConfigRequest {
342        /// Operation type (e.g. "set")
343        operation: String,
344        /// Config key (e.g. "provider", "model", "permission_level")
345        key: String,
346        /// Requested value. When `None`, this represents a "query"/"show" style request.
347        value: Option<String>,
348        /// Whether the UI must request explicit confirmation before applying.
349        requires_confirmation: bool,
350    },
351    /// Tool execution requires user confirmation before proceeding.
352    ///
353    /// This is emitted when a tool call is detected but the session's permission
354    /// level requires user approval before execution. The UI should display a
355    /// confirmation dialog and respond via the confirmation channel.
356    ToolConfirmationRequired {
357        /// Unique ID for this confirmation request
358        confirmation_id: String,
359        /// Tool name that requires confirmation
360        tool_name: String,
361        /// Tool arguments (JSON string)
362        tool_args: String,
363        /// Human-readable description of what the tool will do
364        description: String,
365        /// Risk level (0-10, higher = more dangerous)
366        risk_level: u8,
367        /// Category of the tool (e.g., "write", "shell", "network")
368        category: String,
369    },
370    /// Tool execution was blocked by permission settings.
371    ///
372    /// This is emitted when a tool call is detected but the session's permission
373    /// level blocks the operation entirely (e.g., Sandbox mode blocking writes).
374    ToolBlocked {
375        /// Tool name that was blocked
376        tool_name: String,
377        /// Reason for blocking
378        reason: String,
379    },
380    /// Memory bank entry saved notification for user feedback
381    /// Emitted when context is saved to persistent memory bank file
382    MemoryBankSaved {
383        /// Path to the saved memory bank file
384        file_path: String,
385        /// Session ID associated with this entry
386        session_id: String,
387        /// Brief summary of what was saved
388        summary: String,
389        /// Number of messages saved
390        messages_saved: usize,
391    },
392    /// Agentic loop iteration boundary marker.
393    ///
394    /// Emitted at the start of each agentic loop iteration. When `iteration > 0`,
395    /// it signals that the text following this marker is the LLM's **intermediate
396    /// reasoning** about previous tool results (not the final response). UIs should
397    /// render this text differently (e.g., with a `◆` prefix or distinct styling)
398    /// and clearly delineate iteration boundaries.
399    AgentLoopIteration {
400        /// Zero-based iteration index (0 = first LLM call, 1+ = continuation after tools)
401        iteration: u32,
402    },
403    /// Runtime-authored task-state snapshot.
404    TaskRuntimeSnapshot {
405        /// Authoritative runtime snapshot for the tracked task tree.
406        snapshot: TaskRuntimeSnapshot,
407    },
408    /// Real-time shell output chunk (stdout or stderr).
409    ///
410    /// Emitted while a shell command is executing so the UI can stream output
411    /// into an embedded terminal component. Each chunk is a small fragment of
412    /// text (typically one or a few lines).
413    ShellOutput {
414        /// Unique identifier for the shell process (matches `ShellLifecycle`).
415        process_id: String,
416        /// Long-lived shell session that produced this output, if any.
417        shell_session_id: Option<String>,
418        /// Whether this chunk comes from stdout or stderr.
419        stream: ShellOutputStream,
420        /// The raw text data (may contain ANSI escape sequences).
421        data: String,
422    },
423    /// Shell process lifecycle event.
424    ///
425    /// Emitted when a shell process transitions between states (started,
426    /// completed, failed, stopped, paused, resumed). The UI uses this to
427    /// update the console header, show exit codes, and enable/disable
428    /// control buttons.
429    ShellLifecycle {
430        /// Unique identifier for the shell process (matches `ShellOutput`).
431        process_id: String,
432        /// Long-lived shell session that owns this command run, if any.
433        shell_session_id: Option<String>,
434        /// New state of the process.
435        state: ShellProcessState,
436        /// Exit code (only meaningful when `state` is `Completed` or `Failed`).
437        exit_code: Option<i32>,
438        /// Wall-clock duration in milliseconds (set on terminal states).
439        duration_ms: Option<u64>,
440        /// The command string that was executed.
441        command: String,
442        /// Working directory for the command.
443        cwd: Option<String>,
444    },
445    /// Interactive shell session lifecycle event.
446    ShellSessionLifecycle {
447        /// Long-lived shell session identifier.
448        shell_session_id: String,
449        /// New state of the shell session.
450        state: ShellSessionState,
451        /// Current working directory tracked for the session.
452        cwd: Option<String>,
453        /// Active command process id, when the session is busy.
454        active_process_id: Option<String>,
455        /// Active command string, when the session is busy.
456        active_command: Option<String>,
457        /// Whether the session is currently eligible for reuse.
458        available_for_reuse: bool,
459        /// Whether the session is a user-facing interactive shell.
460        interactive: bool,
461        /// Whether the session is currently reserved for direct user management.
462        user_managed: bool,
463    },
464    /// Stream completed successfully with optional token usage
465    Done(Option<TokenUsage>),
466    /// Stream was cancelled
467    Cancelled,
468    /// Stream was paused (cancelled with the intent to resume later).
469    ///
470    /// The caller is responsible for capturing the `PausedExecutionState` from the
471    /// accumulated streaming context. This variant is a signal to the UI to render
472    /// a resumable pause marker rather than a hard cancellation.
473    Paused,
474    /// An error occurred
475    Error(String),
476    /// Context overflow error - requires compaction before retry.
477    ///
478    /// This is emitted when the LLM request exceeds the model's context window.
479    /// Unlike generic errors, this signals to the pipeline that it should:
480    /// 1. Compact the context (summarize history, remove old messages)
481    /// 2. Retry the request with the reduced context
482    /// 3. Optionally learn the model's actual limit for future requests
483    ContextOverflow {
484        /// The original error message from the provider
485        error_message: String,
486    },
487    /// Experiential reflection phase has started (ERL-inspired).
488    ///
489    /// UIs can use this to surface that the pipeline is performing a
490    /// post-answer self-review step rather than continuing normal tool use.
491    ReflectionStarted {
492        /// Human-readable reason for triggering reflection.
493        reason: String,
494    },
495    /// Experiential reflection phase completed (ERL-inspired).
496    ///
497    /// This reports both the learned summary and whether the result stayed only
498    /// in short-term/session storage or was also promoted into long-term memory.
499    ReflectionComplete {
500        /// Brief summary of what was learned.
501        summary: String,
502        /// Whether the reflection was stored in session working memory.
503        stored: bool,
504        /// Whether the reflection was promoted to long-term memory bank.
505        promoted: bool,
506    },
507}
508
509#[derive(Debug, Clone, Copy, PartialEq, Eq)]
510enum AttemptOutcome {
511    Success,
512    RetryableError,
513    /// Context length exceeded - needs compaction, not blind retry
514    ContextOverflowError,
515    FatalError,
516    Cancelled,
517    Paused,
518    UnexpectedEnd,
519}
520
521#[derive(Debug, Clone)]
522struct AttemptForwardResult {
523    outcome: AttemptOutcome,
524    /// Whether we forwarded any non-terminal output chunk (Text/Thinking/tool call) to the caller.
525    forwarded_output: bool,
526    /// Error message when outcome is RetryableError/FatalError.
527    error: Option<String>,
528}
529
530/// Forward chunks from a single provider attempt to the caller.
531///
532/// Design goal: preserve a *true streaming* UX.
533///
534/// Retry policy: we only consider retrying if the attempt fails **before any output is forwarded**.
535async fn forward_attempt_stream(
536    attempt_rx: &mut mpsc::Receiver<StreamChunk>,
537    tx: &mpsc::Sender<StreamChunk>,
538) -> AttemptForwardResult {
539    let mut forwarded_output = false;
540
541    while let Some(chunk) = attempt_rx.recv().await {
542        match &chunk {
543            StreamChunk::Text(_)
544            | StreamChunk::Thinking(_)
545            | StreamChunk::ToolCallStart { .. }
546            | StreamChunk::ToolCallArgs(_)
547            | StreamChunk::ToolCallEnd
548            | StreamChunk::ToolCallResult { .. } => {
549                forwarded_output = true;
550                let _ = tx.send(chunk).await;
551            }
552            StreamChunk::RetryAttempt { .. } => {
553                // Forward retry notifications without marking as output
554                let _ = tx.send(chunk).await;
555            }
556            StreamChunk::ContextCompacted { .. } => {
557                // Forward compaction notifications without marking as output
558                let _ = tx.send(chunk).await;
559            }
560            StreamChunk::TokenUsageUpdate { .. } => {
561                // Forward token usage updates without marking as output
562                send_token_usage_chunk_best_effort(tx, chunk).await;
563            }
564            StreamChunk::Status { .. } => {
565                // Forward status updates without marking as output
566                send_status_chunk_best_effort(tx, chunk).await;
567            }
568            StreamChunk::ConfigRequest { .. } => {
569                // Forward config requests without marking as output
570                let _ = tx.send(chunk).await;
571            }
572            StreamChunk::ToolConfirmationRequired { .. } => {
573                // Forward tool confirmation requests without marking as output
574                let _ = tx.send(chunk).await;
575            }
576            StreamChunk::ToolBlocked { .. } => {
577                // Forward tool blocked notifications without marking as output
578                let _ = tx.send(chunk).await;
579            }
580            StreamChunk::MemoryBankSaved { .. } => {
581                // Forward memory bank notifications without marking as output
582                let _ = tx.send(chunk).await;
583            }
584            StreamChunk::AgentLoopIteration { .. } => {
585                // Forward agent loop iteration markers without marking as output
586                let _ = tx.send(chunk).await;
587            }
588            StreamChunk::Narration { .. } => {
589                // Forward narration updates without marking as model output.
590                let _ = tx.try_send(chunk);
591            }
592            StreamChunk::TaskRuntimeSnapshot { .. } => {
593                // Forward runtime task-state updates without marking as output
594                let _ = tx.try_send(chunk);
595            }
596            StreamChunk::ReflectionStarted { .. } | StreamChunk::ReflectionComplete { .. } => {
597                // Forward reflection events without marking as output
598                let _ = tx.send(chunk).await;
599            }
600            StreamChunk::ShellOutput { .. } => {
601                // Forward shell output chunks without marking as output –
602                // they are part of tool execution, already tracked via
603                // ToolCallResult.
604                let _ = tx.send(chunk).await;
605            }
606            StreamChunk::ShellLifecycle { .. } => {
607                // Forward shell lifecycle events without marking as output
608                let _ = tx.send(chunk).await;
609            }
610            StreamChunk::ShellSessionLifecycle { .. } => {
611                // Forward shell session lifecycle events without marking as output
612                let _ = tx.send(chunk).await;
613            }
614            StreamChunk::Done(_) => {
615                let _ = tx.send(chunk).await;
616                return AttemptForwardResult {
617                    outcome: AttemptOutcome::Success,
618                    forwarded_output,
619                    error: None,
620                };
621            }
622            StreamChunk::Cancelled => {
623                let _ = tx.send(StreamChunk::Cancelled).await;
624                return AttemptForwardResult {
625                    outcome: AttemptOutcome::Cancelled,
626                    forwarded_output,
627                    error: None,
628                };
629            }
630            StreamChunk::Paused => {
631                let _ = tx.send(StreamChunk::Paused).await;
632                return AttemptForwardResult {
633                    outcome: AttemptOutcome::Paused,
634                    forwarded_output,
635                    error: None,
636                };
637            }
638            StreamChunk::Error(e) => {
639                // Context overflow errors need special handling - they cannot be fixed
640                // by blind retries. The caller should compact context and retry.
641                if is_context_overflow_message(e) {
642                    return AttemptForwardResult {
643                        outcome: AttemptOutcome::ContextOverflowError,
644                        forwarded_output,
645                        error: Some(e.clone()),
646                    };
647                }
648
649                // If we already streamed anything to the caller, we cannot safely retry
650                // without causing duplicated / confusing output.
651                if forwarded_output {
652                    let _ = tx.send(StreamChunk::Error(e.clone())).await;
653                    return AttemptForwardResult {
654                        outcome: AttemptOutcome::FatalError,
655                        forwarded_output,
656                        error: Some(e.clone()),
657                    };
658                }
659
660                return AttemptForwardResult {
661                    outcome: AttemptOutcome::RetryableError,
662                    forwarded_output,
663                    error: Some(e.clone()),
664                };
665            }
666            StreamChunk::ContextOverflow { error_message } => {
667                // Context overflow received as a chunk - forward and signal recovery needed
668                return AttemptForwardResult {
669                    outcome: AttemptOutcome::ContextOverflowError,
670                    forwarded_output,
671                    error: Some(error_message.clone()),
672                };
673            }
674        }
675    }
676
677    AttemptForwardResult {
678        outcome: AttemptOutcome::UnexpectedEnd,
679        forwarded_output,
680        error: None,
681    }
682}
683
684/// Cancellation token for streaming requests
685#[derive(Clone, Debug)]
686pub struct CancellationToken {
687    disposition: Arc<AtomicU8>,
688}
689
690/// Requested interruption disposition for a streaming request.
691#[derive(Clone, Copy, Debug, PartialEq, Eq)]
692#[repr(u8)]
693pub enum CancellationDisposition {
694    Running = 0,
695    Cancelled = 1,
696    Paused = 2,
697}
698
699impl CancellationToken {
700    /// Create a new cancellation token
701    pub fn new() -> Self {
702        Self {
703            disposition: Arc::new(AtomicU8::new(CancellationDisposition::Running as u8)),
704        }
705    }
706
707    /// Cancel the streaming request
708    pub fn cancel(&self) {
709        self.disposition
710            .store(CancellationDisposition::Cancelled as u8, Ordering::SeqCst);
711    }
712
713    /// Pause the streaming request with the intent to resume later.
714    pub fn pause(&self) {
715        let _ = self.disposition.compare_exchange(
716            CancellationDisposition::Running as u8,
717            CancellationDisposition::Paused as u8,
718            Ordering::SeqCst,
719            Ordering::SeqCst,
720        );
721    }
722
723    /// Check if cancellation has been requested
724    pub fn is_cancelled(&self) -> bool {
725        !matches!(self.disposition(), CancellationDisposition::Running)
726    }
727
728    /// Returns `true` when the request should be treated as resumably paused.
729    pub fn is_pause_requested(&self) -> bool {
730        matches!(self.disposition(), CancellationDisposition::Paused)
731    }
732
733    /// Returns the requested interruption disposition.
734    pub fn disposition(&self) -> CancellationDisposition {
735        match self.disposition.load(Ordering::SeqCst) {
736            value if value == CancellationDisposition::Paused as u8 => {
737                CancellationDisposition::Paused
738            }
739            value if value == CancellationDisposition::Cancelled as u8 => {
740                CancellationDisposition::Cancelled
741            }
742            _ => CancellationDisposition::Running,
743        }
744    }
745
746    /// Terminal streaming chunk matching the currently requested interruption.
747    pub fn interruption_chunk(&self) -> StreamChunk {
748        match self.disposition() {
749            CancellationDisposition::Paused => StreamChunk::Paused,
750            CancellationDisposition::Cancelled | CancellationDisposition::Running => {
751                StreamChunk::Cancelled
752            }
753        }
754    }
755}
756
757impl Default for CancellationToken {
758    fn default() -> Self {
759        Self::new()
760    }
761}
762
763/// Create a reqwest client for streaming requests
764fn create_streaming_client() -> reqwest::Client {
765    reqwest::Client::builder()
766        .timeout(Duration::from_secs(STREAMING_TIMEOUT_SECS))
767        .connect_timeout(Duration::from_secs(10))
768        .build()
769        .unwrap_or_else(|_| reqwest::Client::new())
770}
771
772/// Helper to parse <think> tags from chunks.
773/// Handles tags that may be split across multiple chunks by buffering partial matches.
774struct ThinkingParser {
775    in_think_block: bool,
776    /// Buffer for potential partial tag at end of chunk
777    buffer: String,
778}
779
780impl ThinkingParser {
781    fn new() -> Self {
782        Self {
783            in_think_block: false,
784            buffer: String::new(),
785        }
786    }
787
788    fn process(&mut self, chunk: &str) -> Vec<StreamChunk> {
789        let mut chunks = Vec::new();
790
791        // Prepend any buffered content from previous chunk
792        let input = if self.buffer.is_empty() {
793            chunk.to_string()
794        } else {
795            std::mem::take(&mut self.buffer) + chunk
796        };
797
798        let mut remaining = input.as_str();
799
800        while !remaining.is_empty() {
801            if self.in_think_block {
802                if let Some(end_idx) = remaining.find("</think>") {
803                    let thinking_content = &remaining[..end_idx];
804                    if !thinking_content.is_empty() {
805                        chunks.push(StreamChunk::Thinking(thinking_content.to_string()));
806                    }
807                    self.in_think_block = false;
808                    remaining = &remaining[end_idx + 8..];
809                } else {
810                    // Check for partial </think> at end
811                    let partial = Self::find_partial_end_tag(remaining);
812                    if partial > 0 {
813                        let safe_len = remaining.len() - partial;
814                        if safe_len > 0 {
815                            chunks.push(StreamChunk::Thinking(remaining[..safe_len].to_string()));
816                        }
817                        self.buffer = remaining[safe_len..].to_string();
818                    } else {
819                        chunks.push(StreamChunk::Thinking(remaining.to_string()));
820                    }
821                    break;
822                }
823            } else if let Some(start_idx) = remaining.find("<think>") {
824                let text_content = &remaining[..start_idx];
825                if !text_content.is_empty() {
826                    chunks.push(StreamChunk::Text(text_content.to_string()));
827                }
828                self.in_think_block = true;
829                remaining = &remaining[start_idx + 7..];
830            } else {
831                // Check for partial <think> at end
832                let partial = Self::find_partial_start_tag(remaining);
833                if partial > 0 {
834                    let safe_len = remaining.len() - partial;
835                    if safe_len > 0 {
836                        chunks.push(StreamChunk::Text(remaining[..safe_len].to_string()));
837                    }
838                    self.buffer = remaining[safe_len..].to_string();
839                } else {
840                    chunks.push(StreamChunk::Text(remaining.to_string()));
841                }
842                break;
843            }
844        }
845        chunks
846    }
847
848    /// Find length of partial "<think>" at end of string
849    fn find_partial_start_tag(s: &str) -> usize {
850        const TAG: &str = "<think>";
851        for len in (1..TAG.len()).rev() {
852            if s.ends_with(&TAG[..len]) {
853                return len;
854            }
855        }
856        0
857    }
858
859    /// Find length of partial "</think>" at end of string
860    fn find_partial_end_tag(s: &str) -> usize {
861        const TAG: &str = "</think>";
862        for len in (1..TAG.len()).rev() {
863            if s.ends_with(&TAG[..len]) {
864                return len;
865            }
866        }
867        0
868    }
869}
870
871/// Split a complete assistant message into (user-facing text, optional thinking) based on
872/// `<think>...</think>` blocks.
873///
874/// This is used by non-streaming callers to keep behavior consistent with streaming.
875pub fn split_think_blocks(input: &str) -> (String, Option<String>) {
876    let mut parser = ThinkingParser::new();
877    let mut content = String::new();
878    let mut thinking = String::new();
879
880    for chunk in parser.process(input) {
881        match chunk {
882            StreamChunk::Text(t) => content.push_str(&t),
883            StreamChunk::Thinking(t) => thinking.push_str(&t),
884            _ => {}
885        }
886    }
887
888    let thinking = if thinking.trim().is_empty() {
889        None
890    } else {
891        Some(thinking)
892    };
893
894    (content, thinking)
895}
896
897fn collect_complete_lines(buffer: &mut String, incoming: &str) -> Vec<String> {
898    buffer.push_str(incoming);
899    let mut out = Vec::new();
900    let mut start = 0usize;
901
902    {
903        let bytes = buffer.as_bytes();
904        for (i, b) in bytes.iter().enumerate() {
905            if *b == b'\n' {
906                let line = buffer[start..i].trim_end_matches('\r');
907                out.push(line.to_string());
908                start = i + 1;
909            }
910        }
911    }
912
913    if start > 0 {
914        buffer.drain(..start);
915    }
916
917    out
918}
919
920/// Build the JSON request body for an OpenAI Chat Completions streaming call.
921fn build_openai_chat_request_body(
922    model: &str,
923    prompt: &str,
924    tools: Option<&[serde_json::Value]>,
925) -> serde_json::Value {
926    let mut body = serde_json::json!({
927        "model": model,
928        "messages": [{"role": "user", "content": prompt}],
929        "stream": true
930    });
931
932    // Enable structured tool calling when schemas are provided.
933    if let Some(tools) = tools
934        && !tools.is_empty()
935    {
936        body["tools"] = serde_json::Value::Array(tools.to_vec());
937        body["tool_choice"] = serde_json::json!("auto");
938    }
939
940    body
941}
942
943/// Build the JSON request body for an OpenAI Responses streaming call.
944fn build_openai_responses_request_body(
945    model: &str,
946    prompt: &str,
947    tools: Option<&[serde_json::Value]>,
948) -> serde_json::Value {
949    let mut body = serde_json::json!({
950        "model": model,
951        "input": [{"role": "user", "content": prompt}],
952        "stream": true
953    });
954
955    if let Some(tools) = tools
956        && !tools.is_empty()
957    {
958        body["tools"] = serde_json::Value::Array(tools.to_vec());
959        body["tool_choice"] = serde_json::json!("auto");
960    }
961
962    body
963}
964
965fn openai_endpoint_path(api: OpenAiApi) -> &'static str {
966    match api {
967        OpenAiApi::ChatCompletions => "/v1/chat/completions",
968        OpenAiApi::Responses => "/v1/responses",
969    }
970}
971
972fn format_openai_http_error(
973    status: reqwest::StatusCode,
974    provider_name: &str,
975    model: &str,
976    api: OpenAiApi,
977    body: &str,
978    retry_after: Option<Duration>,
979) -> String {
980    if status == reqwest::StatusCode::NOT_FOUND && body.contains("This is not a chat model") {
981        let mut message = format!(
982            "{provider_name} model '{}' appears to require /v1/responses, but Gestura selected {}. Raw provider error: {}",
983            model.trim(),
984            openai_endpoint_path(api),
985            body
986        );
987        if let Some(retry_after) = retry_after {
988            message.push_str(&format_retry_after_suffix(retry_after));
989        }
990        return message;
991    }
992
993    let mut message = format!(
994        "{provider_name} {} HTTP {}: {}",
995        openai_endpoint_path(api),
996        status,
997        body
998    );
999    if let Some(retry_after) = retry_after {
1000        message.push_str(&format_retry_after_suffix(retry_after));
1001    }
1002    message
1003}
1004
1005fn format_retry_after_suffix(retry_after: Duration) -> String {
1006    format!(
1007        " Provider suggested retrying after {} seconds.",
1008        retry_after.as_secs().max(1)
1009    )
1010}
1011
1012fn parse_retry_after_value(value: &str) -> Option<Duration> {
1013    let seconds = value.trim().parse::<u64>().ok()?;
1014    Some(Duration::from_secs(seconds.max(1)))
1015}
1016
1017fn response_retry_after_hint(headers: &reqwest::header::HeaderMap) -> Option<Duration> {
1018    headers
1019        .get(reqwest::header::RETRY_AFTER)?
1020        .to_str()
1021        .ok()
1022        .and_then(parse_retry_after_value)
1023}
1024
1025fn retry_after_hint_from_error_message(message: &str) -> Option<Duration> {
1026    let marker = "provider suggested retrying after ";
1027    let lower = message.to_ascii_lowercase();
1028    let start = lower.find(marker)? + marker.len();
1029    let remainder = &lower[start..];
1030    let seconds = remainder
1031        .chars()
1032        .take_while(|ch| ch.is_ascii_digit())
1033        .collect::<String>()
1034        .parse::<u64>()
1035        .ok()?;
1036    Some(Duration::from_secs(seconds.max(1)))
1037}
1038
1039fn error_is_rate_limited_message(message: &str) -> bool {
1040    let lower = message.to_ascii_lowercase();
1041    lower.contains("http 429")
1042        || lower.contains("rate limit")
1043        || lower.contains("too many requests")
1044        || lower.contains("quota")
1045}
1046
1047fn select_streaming_retry_delay(
1048    policy: &RetryPolicy,
1049    retry_attempt: u32,
1050    error_message: &str,
1051) -> Duration {
1052    let base_delay = policy.delay_for_attempt(retry_attempt);
1053
1054    if let Some(retry_after) = retry_after_hint_from_error_message(error_message) {
1055        return retry_after.max(base_delay);
1056    }
1057
1058    if error_is_rate_limited_message(error_message) {
1059        return base_delay.max(Duration::from_secs(5));
1060    }
1061
1062    base_delay
1063}
1064
1065#[derive(Debug, Clone, Default, PartialEq, Eq)]
1066struct PendingOpenAiToolCall {
1067    id: String,
1068    name: String,
1069    arguments: String,
1070}
1071
1072#[derive(Debug, Clone, Default, PartialEq, Eq)]
1073struct PendingOpenAiResponsesToolCall {
1074    id: String,
1075    name: String,
1076    arguments: String,
1077    finished: bool,
1078}
1079
1080fn merge_openai_tool_call_delta(
1081    pending: &mut BTreeMap<usize, PendingOpenAiToolCall>,
1082    call: &serde_json::Value,
1083    fallback_index: usize,
1084) {
1085    let index = call
1086        .get("index")
1087        .and_then(|value| value.as_u64())
1088        .map(|value| value as usize)
1089        .unwrap_or(fallback_index);
1090
1091    let entry = pending.entry(index).or_default();
1092
1093    if let Some(id) = call["id"].as_str()
1094        && !id.is_empty()
1095    {
1096        entry.id = id.to_string();
1097    }
1098
1099    if let Some(name) = call["function"]["name"].as_str()
1100        && !name.is_empty()
1101    {
1102        entry.name = name.to_string();
1103    }
1104
1105    if let Some(arguments) = call["function"]["arguments"].as_str()
1106        && !arguments.is_empty()
1107    {
1108        entry.arguments.push_str(arguments);
1109    }
1110}
1111
1112fn take_openai_tool_calls(
1113    pending: &mut BTreeMap<usize, PendingOpenAiToolCall>,
1114) -> Vec<(usize, PendingOpenAiToolCall)> {
1115    std::mem::take(pending)
1116        .into_iter()
1117        .filter(|(_, call)| !call.name.is_empty())
1118        .collect()
1119}
1120
1121async fn emit_openai_tool_calls(
1122    tx: &mpsc::Sender<StreamChunk>,
1123    pending: &mut BTreeMap<usize, PendingOpenAiToolCall>,
1124) {
1125    for (index, call) in take_openai_tool_calls(pending) {
1126        let id = if call.id.is_empty() {
1127            format!("openai-tool-{index}")
1128        } else {
1129            call.id
1130        };
1131
1132        let _ = tx
1133            .send(StreamChunk::ToolCallStart {
1134                id,
1135                name: call.name,
1136            })
1137            .await;
1138
1139        if !call.arguments.is_empty() {
1140            let _ = tx.send(StreamChunk::ToolCallArgs(call.arguments)).await;
1141        }
1142
1143        let _ = tx.send(StreamChunk::ToolCallEnd).await;
1144    }
1145}
1146
1147fn merge_openai_responses_tool_item(
1148    pending: &mut BTreeMap<usize, PendingOpenAiResponsesToolCall>,
1149    tool_indices: &mut HashMap<String, usize>,
1150    event: &serde_json::Value,
1151    fallback_index: usize,
1152) {
1153    let index = resolve_openai_responses_tool_index(tool_indices, event, fallback_index);
1154
1155    let item = event.get("item").unwrap_or(event);
1156    let entry = pending.entry(index).or_default();
1157
1158    if let Some(id) = item["call_id"].as_str().or_else(|| item["id"].as_str())
1159        && !id.is_empty()
1160    {
1161        entry.id = id.to_string();
1162    }
1163
1164    if let Some(name) = item["name"].as_str()
1165        && !name.is_empty()
1166    {
1167        entry.name = name.to_string();
1168    }
1169
1170    if let Some(arguments) = item["arguments"].as_str()
1171        && !arguments.is_empty()
1172    {
1173        entry.arguments = arguments.to_string();
1174    }
1175
1176    if event["type"].as_str() == Some("response.output_item.done")
1177        || item["status"].as_str() == Some("completed")
1178    {
1179        entry.finished = true;
1180    }
1181}
1182
1183fn merge_openai_responses_tool_argument_delta(
1184    pending: &mut BTreeMap<usize, PendingOpenAiResponsesToolCall>,
1185    tool_indices: &mut HashMap<String, usize>,
1186    event: &serde_json::Value,
1187    fallback_index: usize,
1188) {
1189    let index = resolve_openai_responses_tool_index(tool_indices, event, fallback_index);
1190
1191    let entry = pending.entry(index).or_default();
1192
1193    if let Some(id) = event["call_id"].as_str()
1194        && !id.is_empty()
1195    {
1196        entry.id = id.to_string();
1197    } else if entry.id.is_empty()
1198        && let Some(id) = event["item_id"].as_str()
1199        && !id.is_empty()
1200    {
1201        entry.id = id.to_string();
1202    }
1203
1204    if let Some(delta) = event["delta"].as_str()
1205        && !delta.is_empty()
1206    {
1207        entry.arguments.push_str(delta);
1208    }
1209}
1210
1211fn complete_openai_responses_tool_arguments(
1212    pending: &mut BTreeMap<usize, PendingOpenAiResponsesToolCall>,
1213    tool_indices: &mut HashMap<String, usize>,
1214    event: &serde_json::Value,
1215    fallback_index: usize,
1216) {
1217    let index = resolve_openai_responses_tool_index(tool_indices, event, fallback_index);
1218
1219    let entry = pending.entry(index).or_default();
1220
1221    if let Some(id) = event["call_id"].as_str()
1222        && !id.is_empty()
1223    {
1224        entry.id = id.to_string();
1225    } else if entry.id.is_empty()
1226        && let Some(id) = event["item_id"].as_str()
1227        && !id.is_empty()
1228    {
1229        entry.id = id.to_string();
1230    }
1231
1232    if let Some(arguments) = event["arguments"].as_str()
1233        && !arguments.is_empty()
1234    {
1235        entry.arguments = arguments.to_string();
1236    }
1237
1238    entry.finished = true;
1239}
1240
1241async fn emit_ready_openai_responses_tool_calls(
1242    tx: &mpsc::Sender<StreamChunk>,
1243    pending: &mut BTreeMap<usize, PendingOpenAiResponsesToolCall>,
1244    emitted_ids: &mut HashSet<String>,
1245    flush_all: bool,
1246) {
1247    let mut ready = Vec::new();
1248
1249    for (&index, call) in pending.iter() {
1250        if call.name.is_empty() {
1251            if flush_all {
1252                continue;
1253            }
1254            break;
1255        }
1256
1257        if flush_all || call.finished {
1258            ready.push(index);
1259            continue;
1260        }
1261
1262        break;
1263    }
1264
1265    for index in ready {
1266        if let Some(call) = pending.remove(&index) {
1267            let id = if call.id.is_empty() {
1268                format!("openai-response-tool-{index}")
1269            } else {
1270                call.id
1271            };
1272
1273            if !emitted_ids.insert(id.clone()) {
1274                tracing::debug!(
1275                    tool_call_id = %id,
1276                    pending_index = index,
1277                    "Skipping duplicate OpenAI Responses tool-call emission"
1278                );
1279                continue;
1280            }
1281
1282            let _ = tx
1283                .send(StreamChunk::ToolCallStart {
1284                    id,
1285                    name: call.name,
1286                })
1287                .await;
1288
1289            if !call.arguments.is_empty() {
1290                let _ = tx.send(StreamChunk::ToolCallArgs(call.arguments)).await;
1291            }
1292
1293            let _ = tx.send(StreamChunk::ToolCallEnd).await;
1294        }
1295    }
1296}
1297
1298fn openai_responses_output_index(event: &serde_json::Value) -> Option<usize> {
1299    event
1300        .get("output_index")
1301        .and_then(|value| value.as_u64())
1302        .map(|value| value as usize)
1303}
1304
1305fn openai_responses_tool_aliases(event: &serde_json::Value) -> Vec<String> {
1306    let item = event.get("item").unwrap_or(event);
1307    let mut aliases = Vec::with_capacity(4);
1308
1309    for candidate in [
1310        item["call_id"].as_str(),
1311        event["call_id"].as_str(),
1312        item["id"].as_str(),
1313        event["item_id"].as_str(),
1314    ] {
1315        if let Some(alias) = candidate.filter(|alias| !alias.is_empty())
1316            && !aliases.iter().any(|existing| existing == alias)
1317        {
1318            aliases.push(alias.to_string());
1319        }
1320    }
1321
1322    aliases
1323}
1324
1325fn resolve_openai_responses_tool_index(
1326    tool_indices: &mut HashMap<String, usize>,
1327    event: &serde_json::Value,
1328    fallback_index: usize,
1329) -> usize {
1330    let aliases = openai_responses_tool_aliases(event);
1331
1332    if let Some(existing_index) = aliases
1333        .iter()
1334        .find_map(|alias| tool_indices.get(alias).copied())
1335    {
1336        for alias in aliases {
1337            tool_indices.insert(alias, existing_index);
1338        }
1339        return existing_index;
1340    }
1341
1342    let index = openai_responses_output_index(event).unwrap_or(fallback_index);
1343    for alias in aliases {
1344        tool_indices.insert(alias, index);
1345    }
1346    index
1347}
1348
1349async fn stream_openai_chat_compatible(
1350    api_key: &str,
1351    base_url: &str,
1352    model: &str,
1353    prompt: &str,
1354    tools: Option<&[serde_json::Value]>,
1355    tx: mpsc::Sender<StreamChunk>,
1356    cancel_token: CancellationToken,
1357) -> Result<(), AppError> {
1358    let url = format!(
1359        "{}{}",
1360        base_url.trim_end_matches('/'),
1361        openai_endpoint_path(OpenAiApi::ChatCompletions)
1362    );
1363    let body = build_openai_chat_request_body(model, prompt, tools);
1364
1365    let client = create_streaming_client();
1366    let response = client
1367        .post(&url)
1368        .bearer_auth(api_key)
1369        .json(&body)
1370        .send()
1371        .await
1372        .map_err(|e| AppError::Llm(format!("OpenAI streaming request failed: {}", e)))?;
1373
1374    if !response.status().is_success() {
1375        let status = response.status();
1376        let retry_after = response_retry_after_hint(response.headers());
1377        let body = response.text().await.unwrap_or_default();
1378
1379        // ALWAYS log this so we know this code path is being hit
1380        tracing::error!(
1381            status = %status,
1382            body_len = body.len(),
1383            "[CONTEXT_OVERFLOW_CHECK] HTTP error received in stream_openai_chat_compatible"
1384        );
1385
1386        let error_msg = format_openai_http_error(
1387            status,
1388            "OpenAI",
1389            model,
1390            OpenAiApi::ChatCompletions,
1391            &body,
1392            retry_after,
1393        );
1394
1395        // Check if this is a context overflow error - needs special handling
1396        let is_overflow =
1397            is_context_overflow_message(&error_msg) || is_context_overflow_message(&body);
1398        tracing::error!(
1399            is_overflow = is_overflow,
1400            body_preview = %body.chars().take(300).collect::<String>(),
1401            "[CONTEXT_OVERFLOW_CHECK] Checking for context overflow"
1402        );
1403
1404        if is_overflow {
1405            tracing::error!("[CONTEXT_OVERFLOW_CHECK] Returning AppError::ContextOverflow");
1406            return Err(AppError::ContextOverflow(error_msg));
1407        }
1408
1409        return Err(AppError::Llm(error_msg));
1410    }
1411
1412    let mut stream = response.bytes_stream();
1413    let mut parser = ThinkingParser::new();
1414    let mut line_buffer = String::new();
1415    // OpenAI-compatible providers may stream multiple tool calls concurrently,
1416    // identifying each call by `index` and interleaving argument fragments
1417    // across SSE events. Buffer them until the provider signals the end of the
1418    // tool-call block, then emit complete Start/Args/End sequences in index
1419    // order so downstream consumers never merge fragments from different calls.
1420    let mut pending_tool_calls = BTreeMap::<usize, PendingOpenAiToolCall>::new();
1421
1422    while let Some(chunk_result) = stream.next().await {
1423        if cancel_token.is_cancelled() {
1424            let _ = tx.send(cancel_token.interruption_chunk()).await;
1425            return Ok(());
1426        }
1427
1428        match chunk_result {
1429            Ok(bytes) => {
1430                let text = String::from_utf8_lossy(&bytes);
1431                for line in collect_complete_lines(&mut line_buffer, &text) {
1432                    let Some(data) = line.strip_prefix("data: ") else {
1433                        continue;
1434                    };
1435                    if data == "[DONE]" {
1436                        emit_openai_tool_calls(&tx, &mut pending_tool_calls).await;
1437                        let _ = tx.send(StreamChunk::Done(None)).await;
1438                        return Ok(());
1439                    }
1440                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
1441                        // Handle content
1442                        if let Some(content) = json["choices"][0]["delta"]["content"].as_str()
1443                            && !content.is_empty()
1444                        {
1445                            let chunks = parser.process(content);
1446                            for chunk in chunks {
1447                                let _ = tx.send(chunk).await;
1448                            }
1449                        }
1450
1451                        // Handle tool calls
1452                        if let Some(tool_calls) =
1453                            json["choices"][0]["delta"]["tool_calls"].as_array()
1454                        {
1455                            for (fallback_index, call) in tool_calls.iter().enumerate() {
1456                                merge_openai_tool_call_delta(
1457                                    &mut pending_tool_calls,
1458                                    call,
1459                                    fallback_index,
1460                                );
1461                            }
1462                        }
1463
1464                        // Handle finish reason — emit each complete tool call in order.
1465                        if let Some(finish_reason) = json["choices"][0]["finish_reason"].as_str()
1466                            && finish_reason == "tool_calls"
1467                        {
1468                            emit_openai_tool_calls(&tx, &mut pending_tool_calls).await;
1469                        }
1470                    }
1471                }
1472            }
1473            Err(e) => {
1474                let _ = tx
1475                    .send(StreamChunk::Error(format!("Stream error: {}", e)))
1476                    .await;
1477                return Err(AppError::Llm(format!("Stream error: {}", e)));
1478            }
1479        }
1480    }
1481
1482    let _ = tx.send(StreamChunk::Done(None)).await;
1483    Ok(())
1484}
1485
1486async fn stream_openai_responses(
1487    api_key: &str,
1488    base_url: &str,
1489    model: &str,
1490    prompt: &str,
1491    tools: Option<&[serde_json::Value]>,
1492    tx: mpsc::Sender<StreamChunk>,
1493    cancel_token: CancellationToken,
1494) -> Result<(), AppError> {
1495    let url = format!(
1496        "{}{}",
1497        base_url.trim_end_matches('/'),
1498        openai_endpoint_path(OpenAiApi::Responses)
1499    );
1500    let body = build_openai_responses_request_body(model, prompt, tools);
1501
1502    let client = create_streaming_client();
1503    let response = client
1504        .post(&url)
1505        .bearer_auth(api_key)
1506        .json(&body)
1507        .send()
1508        .await
1509        .map_err(|e| AppError::Llm(format!("OpenAI streaming request failed: {}", e)))?;
1510
1511    if !response.status().is_success() {
1512        let status = response.status();
1513        let retry_after = response_retry_after_hint(response.headers());
1514        let body = response.text().await.unwrap_or_default();
1515        let error_msg = format_openai_http_error(
1516            status,
1517            "OpenAI",
1518            model,
1519            OpenAiApi::Responses,
1520            &body,
1521            retry_after,
1522        );
1523
1524        // Check if this is a context overflow error
1525        if is_context_overflow_message(&error_msg) || is_context_overflow_message(&body) {
1526            return Err(AppError::ContextOverflow(error_msg));
1527        }
1528
1529        return Err(AppError::Llm(error_msg));
1530    }
1531
1532    let mut stream = response.bytes_stream();
1533    let mut parser = ThinkingParser::new();
1534    let mut line_buffer = String::new();
1535    let mut pending_tool_calls = BTreeMap::<usize, PendingOpenAiResponsesToolCall>::new();
1536    let mut tool_call_indices = HashMap::<String, usize>::new();
1537    let mut emitted_tool_call_ids = HashSet::<String>::new();
1538    let mut fallback_index = 0usize;
1539
1540    while let Some(chunk_result) = stream.next().await {
1541        if cancel_token.is_cancelled() {
1542            let _ = tx.send(cancel_token.interruption_chunk()).await;
1543            return Ok(());
1544        }
1545
1546        match chunk_result {
1547            Ok(bytes) => {
1548                let text = String::from_utf8_lossy(&bytes);
1549                for line in collect_complete_lines(&mut line_buffer, &text) {
1550                    let Some(data) = line.strip_prefix("data: ") else {
1551                        continue;
1552                    };
1553                    if data == "[DONE]" {
1554                        emit_ready_openai_responses_tool_calls(
1555                            &tx,
1556                            &mut pending_tool_calls,
1557                            &mut emitted_tool_call_ids,
1558                            true,
1559                        )
1560                        .await;
1561                        let _ = tx.send(StreamChunk::Done(None)).await;
1562                        return Ok(());
1563                    }
1564
1565                    let Ok(json) = serde_json::from_str::<serde_json::Value>(data) else {
1566                        continue;
1567                    };
1568
1569                    match json["type"].as_str().unwrap_or_default() {
1570                        "response.output_text.delta" => {
1571                            if let Some(delta) = json["delta"].as_str()
1572                                && !delta.is_empty()
1573                            {
1574                                for chunk in parser.process(delta) {
1575                                    let _ = tx.send(chunk).await;
1576                                }
1577                            }
1578                        }
1579                        "response.output_item.added" | "response.output_item.done"
1580                            if json["item"]["type"].as_str() == Some("function_call") =>
1581                        {
1582                            merge_openai_responses_tool_item(
1583                                &mut pending_tool_calls,
1584                                &mut tool_call_indices,
1585                                &json,
1586                                fallback_index,
1587                            );
1588                            emit_ready_openai_responses_tool_calls(
1589                                &tx,
1590                                &mut pending_tool_calls,
1591                                &mut emitted_tool_call_ids,
1592                                false,
1593                            )
1594                            .await;
1595                        }
1596                        "response.function_call_arguments.delta" => {
1597                            merge_openai_responses_tool_argument_delta(
1598                                &mut pending_tool_calls,
1599                                &mut tool_call_indices,
1600                                &json,
1601                                fallback_index,
1602                            );
1603                        }
1604                        "response.function_call_arguments.done" => {
1605                            complete_openai_responses_tool_arguments(
1606                                &mut pending_tool_calls,
1607                                &mut tool_call_indices,
1608                                &json,
1609                                fallback_index,
1610                            );
1611                            emit_ready_openai_responses_tool_calls(
1612                                &tx,
1613                                &mut pending_tool_calls,
1614                                &mut emitted_tool_call_ids,
1615                                false,
1616                            )
1617                            .await;
1618                        }
1619                        "response.completed" => {
1620                            emit_ready_openai_responses_tool_calls(
1621                                &tx,
1622                                &mut pending_tool_calls,
1623                                &mut emitted_tool_call_ids,
1624                                true,
1625                            )
1626                            .await;
1627                            let _ = tx.send(StreamChunk::Done(None)).await;
1628                            return Ok(());
1629                        }
1630                        "response.failed" => {
1631                            let message = json["response"]["error"]["message"]
1632                                .as_str()
1633                                .unwrap_or("OpenAI Responses stream failed")
1634                                .to_string();
1635                            let _ = tx.send(StreamChunk::Error(message.clone())).await;
1636                            return Err(AppError::Llm(message));
1637                        }
1638                        _ => {}
1639                    }
1640
1641                    fallback_index = fallback_index.saturating_add(1);
1642                }
1643            }
1644            Err(e) => {
1645                let _ = tx
1646                    .send(StreamChunk::Error(format!("Stream error: {}", e)))
1647                    .await;
1648                return Err(AppError::Llm(format!("Stream error: {}", e)));
1649            }
1650        }
1651    }
1652
1653    emit_ready_openai_responses_tool_calls(
1654        &tx,
1655        &mut pending_tool_calls,
1656        &mut emitted_tool_call_ids,
1657        true,
1658    )
1659    .await;
1660    let _ = tx.send(StreamChunk::Done(None)).await;
1661    Ok(())
1662}
1663
1664pub async fn stream_openai(
1665    api_key: &str,
1666    base_url: &str,
1667    model: &str,
1668    prompt: &str,
1669    tools: Option<&[serde_json::Value]>,
1670    tx: mpsc::Sender<StreamChunk>,
1671    cancel_token: CancellationToken,
1672) -> Result<(), AppError> {
1673    if is_openai_model_incompatible_with_agent_session(model) {
1674        return Err(AppError::Llm(openai_agent_session_model_message(model)));
1675    }
1676
1677    match openai_api_for_model(model) {
1678        OpenAiApi::ChatCompletions => {
1679            stream_openai_chat_compatible(api_key, base_url, model, prompt, tools, tx, cancel_token)
1680                .await
1681        }
1682        OpenAiApi::Responses => {
1683            stream_openai_responses(api_key, base_url, model, prompt, tools, tx, cancel_token).await
1684        }
1685    }
1686}
1687
1688/// Stream a response from Anthropic Claude API
1689///
1690/// This is an implementation detail used by `start_streaming(..)`.
1691/// To keep the API maintainable, we pass arguments via a struct.
1692#[derive(Debug)]
1693pub struct AnthropicStreamRequest<'a> {
1694    pub api_key: &'a str,
1695    pub base_url: &'a str,
1696    pub model: &'a str,
1697    pub thinking_budget_tokens: Option<u32>,
1698    pub prompt: &'a str,
1699    pub tools: Option<&'a [serde_json::Value]>,
1700    pub tx: mpsc::Sender<StreamChunk>,
1701    pub cancel_token: CancellationToken,
1702}
1703
1704fn build_anthropic_messages_body(
1705    model: &str,
1706    prompt: &str,
1707    thinking_budget_tokens: Option<u32>,
1708    tools: Option<&[serde_json::Value]>,
1709) -> serde_json::Value {
1710    let mut body = serde_json::json!({
1711        "model": model,
1712        "max_tokens": 4096,
1713        "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}],
1714        "stream": true
1715    });
1716
1717    // Enable structured tool calling when schemas are provided.
1718    if let Some(tools) = tools
1719        && !tools.is_empty()
1720    {
1721        body["tools"] = serde_json::Value::Array(tools.to_vec());
1722    }
1723
1724    // Optional provider-native thinking stream (emitted as StreamChunk::Thinking).
1725    if let Some(budget_tokens) = thinking_budget_tokens {
1726        body["thinking"] = serde_json::json!({ "type": "enabled", "budget_tokens": budget_tokens });
1727    }
1728
1729    body
1730}
1731
1732pub async fn stream_anthropic(req: AnthropicStreamRequest<'_>) -> Result<(), AppError> {
1733    let AnthropicStreamRequest {
1734        api_key,
1735        base_url,
1736        model,
1737        thinking_budget_tokens,
1738        prompt,
1739        tools,
1740        tx,
1741        cancel_token,
1742    } = req;
1743
1744    let url = format!("{}/v1/messages", base_url);
1745    let body = build_anthropic_messages_body(model, prompt, thinking_budget_tokens, tools);
1746
1747    let client = create_streaming_client();
1748    let response = client
1749        .post(&url)
1750        .header("x-api-key", api_key)
1751        .header("anthropic-version", "2023-06-01")
1752        .json(&body)
1753        .send()
1754        .await
1755        .map_err(|e| AppError::Llm(format!("Anthropic streaming request failed: {}", e)))?;
1756
1757    if !response.status().is_success() {
1758        let status = response.status();
1759        let body = response.text().await.unwrap_or_default();
1760        let error_msg = format!("Anthropic HTTP {}: {}", status, body);
1761
1762        // Check if this is a context overflow error
1763        if is_context_overflow_message(&error_msg) || is_context_overflow_message(&body) {
1764            return Err(AppError::ContextOverflow(error_msg));
1765        }
1766
1767        return Err(AppError::Llm(error_msg));
1768    }
1769
1770    let mut stream = response.bytes_stream();
1771    let mut line_buffer = String::new();
1772    let mut parser = ThinkingParser::new();
1773    let mut in_tool_block = false;
1774
1775    while let Some(chunk_result) = stream.next().await {
1776        if cancel_token.is_cancelled() {
1777            let _ = tx.send(cancel_token.interruption_chunk()).await;
1778            return Ok(());
1779        }
1780
1781        match chunk_result {
1782            Ok(bytes) => {
1783                let text = String::from_utf8_lossy(&bytes);
1784                for line in collect_complete_lines(&mut line_buffer, &text) {
1785                    let Some(data) = line.strip_prefix("data: ") else {
1786                        continue;
1787                    };
1788                    let Ok(json) = serde_json::from_str::<serde_json::Value>(data) else {
1789                        continue;
1790                    };
1791                    match json["type"].as_str() {
1792                        Some("message_stop") => {
1793                            let _ = tx.send(StreamChunk::Done(None)).await;
1794                            return Ok(());
1795                        }
1796                        Some("content_block_delta") => {
1797                            if let Some(delta) = json["delta"].as_object() {
1798                                // Anthropic can stream both normal text and (optionally) extended thinking.
1799                                if let Some(content) = delta.get("text").and_then(|v| v.as_str())
1800                                    && !content.is_empty()
1801                                {
1802                                    for chunk in parser.process(content) {
1803                                        let _ = tx.send(chunk).await;
1804                                    }
1805                                }
1806
1807                                if let Some(thinking) =
1808                                    delta.get("thinking").and_then(|v| v.as_str())
1809                                    && !thinking.is_empty()
1810                                {
1811                                    let _ =
1812                                        tx.send(StreamChunk::Thinking(thinking.to_string())).await;
1813                                }
1814
1815                                if in_tool_block
1816                                    && let Some(partial_json) =
1817                                        delta.get("partial_json").and_then(|v| v.as_str())
1818                                    && !partial_json.is_empty()
1819                                {
1820                                    let _ = tx
1821                                        .send(StreamChunk::ToolCallArgs(partial_json.to_string()))
1822                                        .await;
1823                                }
1824                            }
1825                        }
1826                        Some("content_block_start") => {
1827                            // Anthropic SSE format: content_block IS the tool_use object:
1828                            //   {"type":"content_block_start","content_block":{"type":"tool_use","id":"toolu_xxx","name":"shell","input":{}}}
1829                            let block = &json["content_block"];
1830                            if block["type"].as_str() == Some("tool_use")
1831                                && let Some(name) = block["name"].as_str()
1832                            {
1833                                let id = block["id"].as_str().unwrap_or_default().to_string();
1834                                let _ = tx
1835                                    .send(StreamChunk::ToolCallStart {
1836                                        id,
1837                                        name: name.to_string(),
1838                                    })
1839                                    .await;
1840                                in_tool_block = true;
1841                            }
1842                        }
1843                        Some("content_block_stop") if in_tool_block => {
1844                            let _ = tx.send(StreamChunk::ToolCallEnd).await;
1845                            in_tool_block = false;
1846                        }
1847                        _ => {}
1848                    }
1849                }
1850            }
1851            Err(e) => {
1852                let _ = tx
1853                    .send(StreamChunk::Error(format!("Stream error: {}", e)))
1854                    .await;
1855                return Err(AppError::Llm(format!("Stream error: {}", e)));
1856            }
1857        }
1858    }
1859
1860    let _ = tx.send(StreamChunk::Done(None)).await;
1861    Ok(())
1862}
1863
1864/// Build the JSON request body for Gemini streaming.
1865fn build_gemini_body(prompt: &str, tools: Option<&[serde_json::Value]>) -> serde_json::Value {
1866    let mut body = serde_json::json!({
1867        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
1868    });
1869
1870    if let Some(tools) = tools
1871        && !tools.is_empty()
1872    {
1873        body["tools"] = serde_json::json!([{"functionDeclarations": tools}]);
1874        body["toolConfig"] = serde_json::json!({"functionCallingConfig": {"mode": "AUTO"}});
1875    }
1876
1877    body
1878}
1879
1880/// Stream a response from Google Gemini API (Generative Language API).
1881///
1882/// Gemini uses SSE with `alt=sse` query parameter and authenticates via API key
1883/// in the query string (not Bearer token).
1884pub async fn stream_gemini(
1885    api_key: &str,
1886    base_url: &str,
1887    model: &str,
1888    prompt: &str,
1889    tools: Option<&[serde_json::Value]>,
1890    tx: mpsc::Sender<StreamChunk>,
1891    cancel_token: CancellationToken,
1892) -> Result<(), AppError> {
1893    let url = format!(
1894        "{}/v1beta/models/{}:streamGenerateContent?alt=sse&key={}",
1895        base_url, model, api_key
1896    );
1897    let body = build_gemini_body(prompt, tools);
1898
1899    let client = create_streaming_client();
1900    let response = client
1901        .post(&url)
1902        .json(&body)
1903        .send()
1904        .await
1905        .map_err(|e| AppError::Llm(format!("Gemini streaming request failed: {}", e)))?;
1906
1907    if !response.status().is_success() {
1908        let status = response.status();
1909        let body = response.text().await.unwrap_or_default();
1910        let error_msg = format!("Gemini HTTP {}: {}", status, body);
1911
1912        // Check if this is a context overflow error
1913        if is_context_overflow_message(&error_msg) || is_context_overflow_message(&body) {
1914            return Err(AppError::ContextOverflow(error_msg));
1915        }
1916
1917        return Err(AppError::Llm(error_msg));
1918    }
1919
1920    let mut stream = response.bytes_stream();
1921    let mut line_buffer = String::new();
1922    let mut parser = ThinkingParser::new();
1923    let mut last_usage: Option<TokenUsage> = None;
1924
1925    while let Some(chunk_result) = stream.next().await {
1926        if cancel_token.is_cancelled() {
1927            let _ = tx.send(cancel_token.interruption_chunk()).await;
1928            return Ok(());
1929        }
1930
1931        match chunk_result {
1932            Ok(bytes) => {
1933                let text = String::from_utf8_lossy(&bytes);
1934                for line in collect_complete_lines(&mut line_buffer, &text) {
1935                    let Some(data) = line.strip_prefix("data: ") else {
1936                        continue;
1937                    };
1938                    let Ok(json) = serde_json::from_str::<serde_json::Value>(data) else {
1939                        continue;
1940                    };
1941
1942                    // Track usage metadata from each chunk (last one wins).
1943                    if let Some(usage) = json.get("usageMetadata") {
1944                        let input_tokens = usage["promptTokenCount"].as_u64().unwrap_or(0) as u32;
1945                        let output_tokens =
1946                            usage["candidatesTokenCount"].as_u64().unwrap_or(0) as u32;
1947                        last_usage = Some(
1948                            TokenUsage::new(input_tokens, output_tokens).with_provider("gemini"),
1949                        );
1950                    }
1951
1952                    // Process candidate parts.
1953                    if let Some(parts) = json
1954                        .pointer("/candidates/0/content/parts")
1955                        .and_then(|v| v.as_array())
1956                    {
1957                        for (idx, part) in parts.iter().enumerate() {
1958                            // Text part
1959                            if let Some(content) = part["text"].as_str()
1960                                && !content.is_empty()
1961                            {
1962                                for chunk in parser.process(content) {
1963                                    let _ = tx.send(chunk).await;
1964                                }
1965                            }
1966
1967                            // Function call part — Gemini sends complete function calls
1968                            // (not streamed arguments), so emit start + args + end.
1969                            if let Some(fc) = part.get("functionCall")
1970                                && let Some(name) = fc["name"].as_str()
1971                            {
1972                                let id = format!("gemini-call-{}", idx);
1973                                let _ = tx
1974                                    .send(StreamChunk::ToolCallStart {
1975                                        id,
1976                                        name: name.to_string(),
1977                                    })
1978                                    .await;
1979
1980                                let args = fc
1981                                    .get("args")
1982                                    .map(|a| a.to_string())
1983                                    .unwrap_or_else(|| "{}".to_string());
1984                                let _ = tx.send(StreamChunk::ToolCallArgs(args)).await;
1985                                let _ = tx.send(StreamChunk::ToolCallEnd).await;
1986                            }
1987                        }
1988                    }
1989                }
1990            }
1991            Err(e) => {
1992                let _ = tx
1993                    .send(StreamChunk::Error(format!("Stream error: {}", e)))
1994                    .await;
1995                return Err(AppError::Llm(format!("Stream error: {}", e)));
1996            }
1997        }
1998    }
1999
2000    let _ = tx.send(StreamChunk::Done(last_usage)).await;
2001    Ok(())
2002}
2003
2004/// Keepalive interval for Ollama streaming.
2005///
2006/// Ollama may take a long time to load a model into memory (especially large
2007/// models). Additionally, when a model decides to make a tool call, the entire
2008/// tool call JSON only appears in the final `done:true` NDJSON line — no
2009/// individual tokens are streamed during that deliberation phase. Both of these
2010/// situations produce silence on the wire that can trigger the caller's idle
2011/// timer. We send periodic `Status` keepalive chunks throughout the **entire**
2012/// stream lifetime (not only during model loading) to prevent premature
2013/// timeouts.
2014const OLLAMA_KEEPALIVE_INTERVAL_SECS: u64 = 30;
2015
2016/// Stream a response from Ollama local API
2017pub async fn stream_ollama(
2018    base_url: &str,
2019    model: &str,
2020    prompt: &str,
2021    tools: Option<&[serde_json::Value]>,
2022    tx: mpsc::Sender<StreamChunk>,
2023    cancel_token: CancellationToken,
2024) -> Result<(), AppError> {
2025    let url = format!("{}/api/chat", base_url.trim_end_matches('/'));
2026    let mut body = serde_json::json!({
2027        "model": model,
2028        "messages": [{"role": "user", "content": prompt}],
2029        "stream": true
2030    });
2031
2032    // Ollama uses OpenAI-compatible tool schema format.
2033    if let Some(tools) = tools
2034        && !tools.is_empty()
2035    {
2036        body["tools"] = serde_json::Value::Array(tools.to_vec());
2037    }
2038
2039    tracing::debug!(
2040        model = model,
2041        url = %url,
2042        tools_count = tools.map(|t| t.len()).unwrap_or(0),
2043        has_tools = tools.map(|t| !t.is_empty()).unwrap_or(false),
2044        "[Ollama] Starting stream request"
2045    );
2046
2047    let client = create_streaming_client();
2048
2049    // Pre-connection keepalive: Ollama must load the model into VRAM before it
2050    // can start streaming. For large models (e.g. 20B+) this can take 100–200s,
2051    // which exceeds the GUI's 90s idle timer. We spawn a background task that
2052    // sends a Status chunk every 30s while reqwest's .send().await is blocking,
2053    // so the idle timer keeps getting reset during the model-loading phase.
2054    let pre_conn_tx = tx.clone();
2055    let pre_conn_model = model.to_string();
2056    let pre_conn_handle = tokio::spawn(
2057        {
2058            let interval = Duration::from_secs(OLLAMA_KEEPALIVE_INTERVAL_SECS);
2059            async move {
2060                loop {
2061                    tokio::time::sleep(interval).await;
2062                    tracing::debug!(
2063                        model = %pre_conn_model,
2064                        "[Ollama] Pre-connection keepalive: model still loading"
2065                    );
2066                    send_status_chunk_best_effort(
2067                        &pre_conn_tx,
2068                        StreamChunk::Status {
2069                            message: format!("Loading model '{pre_conn_model}'…"),
2070                        },
2071                    )
2072                    .await;
2073                }
2074            }
2075        }
2076        .instrument(tracing::Span::current()),
2077    );
2078
2079    let send_result = client.post(&url).json(&body).send().await;
2080
2081    // Abort the pre-connection keepalive now that we have a response (or error).
2082    // abort() is instant; the subsequent await just confirms the task has stopped.
2083    pre_conn_handle.abort();
2084    let _ = pre_conn_handle.await;
2085
2086    let response = send_result
2087        .map_err(|e| AppError::Llm(format!("Ollama streaming request failed: {}", e)))?;
2088
2089    if !response.status().is_success() {
2090        let status = response.status();
2091        let body = response.text().await.unwrap_or_default();
2092        let error_msg = format!("Ollama HTTP {}: {}", status, body);
2093
2094        // Check if this is a context overflow error
2095        if is_context_overflow_message(&error_msg) || is_context_overflow_message(&body) {
2096            return Err(AppError::ContextOverflow(error_msg));
2097        }
2098
2099        return Err(AppError::Llm(error_msg));
2100    }
2101
2102    // Immediately notify the caller that we have a connection. This resets
2103    // the caller's idle timer, which is critical because Ollama may spend a
2104    // long time loading the model into memory before sending any tokens.
2105    send_status_chunk_best_effort(
2106        &tx,
2107        StreamChunk::Status {
2108            message: format!("Connected to Ollama — loading model '{}'…", model),
2109        },
2110    )
2111    .await;
2112    tracing::debug!(
2113        model = model,
2114        "[Ollama] HTTP connection established; 'Connected' status sent"
2115    );
2116
2117    let mut stream = response.bytes_stream();
2118    let mut parser = ThinkingParser::new();
2119    let mut line_buffer = String::new();
2120
2121    let keepalive_interval = Duration::from_secs(OLLAMA_KEEPALIVE_INTERVAL_SECS);
2122    let keepalive_sleep = tokio::time::sleep(keepalive_interval);
2123    tokio::pin!(keepalive_sleep);
2124
2125    loop {
2126        tokio::select! {
2127            maybe_chunk = stream.next() => {
2128                let Some(chunk_result) = maybe_chunk else {
2129                    // Stream ended
2130                    break;
2131                };
2132
2133                if cancel_token.is_cancelled() {
2134                    let _ = tx.send(cancel_token.interruption_chunk()).await;
2135                    return Ok(());
2136                }
2137
2138                match chunk_result {
2139                    Ok(bytes) => {
2140                        let text = String::from_utf8_lossy(&bytes);
2141                        for line in collect_complete_lines(&mut line_buffer, &text) {
2142                            if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
2143                                tracing::trace!(
2144                                    done = json["done"].as_bool().unwrap_or(false),
2145                                    has_content = json["message"]["content"].as_str().map(|s| !s.is_empty()).unwrap_or(false),
2146                                    has_tool_calls = json["message"]["tool_calls"].is_array(),
2147                                    "[Ollama] NDJSON line parsed"
2148                                );
2149
2150                                // Extract content from message (may arrive before done)
2151                                if let Some(content) = json["message"]["content"].as_str()
2152                                    && !content.is_empty()
2153                                {
2154                                    let chunks = parser.process(content);
2155                                    for chunk in chunks {
2156                                        let _ = tx.send(chunk).await;
2157                                    }
2158                                }
2159
2160                                // Handle tool calls (Ollama returns them in the message)
2161                                if let Some(tool_calls) = json["message"]["tool_calls"].as_array() {
2162                                    tracing::debug!(
2163                                        model = model,
2164                                        count = tool_calls.len(),
2165                                        "[Ollama] Tool calls found in NDJSON line"
2166                                    );
2167                                    for call in tool_calls {
2168                                        let name = call["function"]["name"].as_str().unwrap_or_default();
2169                                        let args = &call["function"]["arguments"];
2170
2171                                        if !name.is_empty() {
2172                                            tracing::debug!(
2173                                                tool = name,
2174                                                "[Ollama] Emitting ToolCallStart/Args/End"
2175                                            );
2176                                            let id = format!("ollama-tool-{}", uuid::Uuid::new_v4());
2177                                            let _ = tx
2178                                                .send(StreamChunk::ToolCallStart {
2179                                                    id,
2180                                                    name: name.to_string(),
2181                                                })
2182                                                .await;
2183
2184                                            let args_str = if args.is_object() || args.is_array() {
2185                                                serde_json::to_string(args).unwrap_or_default()
2186                                            } else {
2187                                                args.as_str().unwrap_or("{}").to_string()
2188                                            };
2189
2190                                            let _ = tx.send(StreamChunk::ToolCallArgs(args_str)).await;
2191                                            let _ = tx.send(StreamChunk::ToolCallEnd).await;
2192                                            tracing::debug!(tool = name, "[Ollama] ToolCallEnd emitted");
2193                                        }
2194                                    }
2195                                }
2196
2197                                // Check if done
2198                                if json["done"].as_bool() == Some(true) {
2199                                    tracing::debug!(model = model, "[Ollama] done=true — sending Done chunk");
2200                                    let _ = tx.send(StreamChunk::Done(None)).await;
2201                                    return Ok(());
2202                                }
2203                            }
2204                        }
2205                    }
2206                    Err(e) => {
2207                        let _ = tx
2208                            .send(StreamChunk::Error(format!("Stream error: {}", e)))
2209                            .await;
2210                        return Err(AppError::Llm(format!("Stream error: {}", e)));
2211                    }
2212                }
2213            }
2214
2215            // Keepalive: send periodic status messages throughout the entire
2216            // stream. This prevents the caller's idle timer (typically 90 s)
2217            // from firing during two distinct silent phases:
2218            //   1. Cold starts — Ollama loads large models into memory before
2219            //      sending any tokens.
2220            //   2. Tool-call generation — Ollama only sends a tool call in the
2221            //      final `done:true` NDJSON line. The model deliberates in
2222            //      silence before that line arrives, which can easily exceed
2223            //      90 s on a local large model.
2224            () = &mut keepalive_sleep => {
2225                if cancel_token.is_cancelled() {
2226                    let _ = tx.send(cancel_token.interruption_chunk()).await;
2227                    return Ok(());
2228                }
2229                tracing::debug!(model = model, "[Ollama] Keepalive firing — sending Status chunk");
2230                send_status_chunk_best_effort(
2231                    &tx,
2232                    StreamChunk::Status {
2233                        message: format!("Working… (model '{}')", model),
2234                    },
2235                )
2236                .await;
2237                tracing::debug!(
2238                    model = model,
2239                    "[Ollama] Keepalive Status sent"
2240                );
2241                // Reset the keepalive timer for the next interval.
2242                keepalive_sleep
2243                    .as_mut()
2244                    .reset(tokio::time::Instant::now() + keepalive_interval);
2245            }
2246        }
2247    }
2248
2249    let _ = tx.send(StreamChunk::Done(None)).await;
2250    Ok(())
2251}
2252
2253/// Emit an error when the LLM provider is not configured.
2254///
2255/// Sends a `Status` message (which does not count as "output" for retry gating)
2256/// followed by an `Error` chunk, then returns an error to the caller.
2257async fn stream_unconfigured_error(
2258    provider_name: &str,
2259    tx: mpsc::Sender<StreamChunk>,
2260) -> Result<(), AppError> {
2261    let message = format!(
2262        "LLM provider '{}' is not configured. Please configure it in Settings or run 'gestura config edit'.",
2263        provider_name
2264    );
2265    // Status chunk does not count as output for retry purposes
2266    send_status_chunk_best_effort(
2267        &tx,
2268        StreamChunk::Status {
2269            message: message.clone(),
2270        },
2271    )
2272    .await;
2273    let _ = tx.send(StreamChunk::Error(message.clone())).await;
2274    Err(AppError::Llm(message))
2275}
2276
2277/// Returns `true` if a message indicates the provider is not configured.
2278///
2279/// We use this to skip pointless retry delays when failure is caused solely by
2280/// missing local configuration (e.g., absent API key).
2281fn is_unconfigured_provider_message(message: &str) -> bool {
2282    message.contains("is not configured") || message.contains("not configured")
2283}
2284
2285/// Returns `true` if an error message indicates a context length overflow.
2286///
2287/// These errors cannot be fixed by blind retries - they require context compaction
2288/// or switching to a model with a larger context window.
2289fn is_context_overflow_message(message: &str) -> bool {
2290    let msg_lower = message.to_lowercase();
2291    // OpenAI format: "contextlengthexceeded" (no underscore in JSON code field)
2292    // or "context_length_exceeded" (with underscore in some error messages)
2293    let is_overflow = msg_lower.contains("contextlengthexceeded")
2294        || msg_lower.contains("context_length_exceeded")
2295        || msg_lower.contains("context length")
2296        || msg_lower.contains("maximum context")
2297        || (msg_lower.contains("tokens") && msg_lower.contains("exceeds"))
2298        || (msg_lower.contains("token") && msg_lower.contains("limit"));
2299
2300    if is_overflow {
2301        tracing::warn!(
2302            message_preview = %message.chars().take(200).collect::<String>(),
2303            "Detected context overflow error"
2304        );
2305    }
2306
2307    is_overflow
2308}
2309
2310/// Returns `true` if an [`AppError`] indicates a provider is not configured.
2311fn is_unconfigured_provider_error(err: &AppError) -> bool {
2312    match err {
2313        AppError::Llm(msg) => is_unconfigured_provider_message(msg),
2314        _ => false,
2315    }
2316}
2317
2318/// Start a streaming LLM request based on config.
2319///
2320/// Returns an error if the selected provider is not configured.
2321pub async fn start_streaming(
2322    config: &StreamingConfig,
2323    prompt: &str,
2324    tool_schemas: Option<ProviderToolSchemas>,
2325    tx: mpsc::Sender<StreamChunk>,
2326    cancel_token: CancellationToken,
2327) -> Result<(), AppError> {
2328    async {
2329        match config.primary.as_str() {
2330            "openai" => {
2331                if let Some(c) = &config.openai {
2332                    let openai_tools =
2333                        tool_schemas
2334                            .as_ref()
2335                            .map(|schemas| match openai_api_for_model(&c.model) {
2336                                OpenAiApi::ChatCompletions => schemas.openai.as_slice(),
2337                                OpenAiApi::Responses => schemas.openai_responses.as_slice(),
2338                            });
2339                    stream_openai(
2340                        &c.api_key,
2341                        c.base_url.as_deref().unwrap_or("https://api.openai.com"),
2342                        &c.model,
2343                        prompt,
2344                        openai_tools,
2345                        tx,
2346                        cancel_token,
2347                    )
2348                    .await
2349                } else {
2350                    stream_unconfigured_error("openai", tx).await
2351                }
2352            }
2353            "anthropic" => {
2354                if let Some(c) = &config.anthropic {
2355                    stream_anthropic(AnthropicStreamRequest {
2356                        api_key: &c.api_key,
2357                        base_url: c.base_url.as_deref().unwrap_or("https://api.anthropic.com"),
2358                        model: &c.model,
2359                        thinking_budget_tokens: c.thinking_budget_tokens,
2360                        prompt,
2361                        tools: tool_schemas.as_ref().map(|s| s.anthropic.as_slice()),
2362                        tx,
2363                        cancel_token,
2364                    })
2365                    .await
2366                } else {
2367                    stream_unconfigured_error("anthropic", tx).await
2368                }
2369            }
2370            "grok" => {
2371                // Grok uses OpenAI-compatible API
2372                if let Some(c) = &config.grok {
2373                    stream_openai_chat_compatible(
2374                        &c.api_key,
2375                        c.base_url.as_deref().unwrap_or("https://api.x.ai"),
2376                        &c.model,
2377                        prompt,
2378                        tool_schemas.as_ref().map(|s| s.openai.as_slice()),
2379                        tx,
2380                        cancel_token,
2381                    )
2382                    .await
2383                } else {
2384                    stream_unconfigured_error("grok", tx).await
2385                }
2386            }
2387            "gemini" => {
2388                if let Some(c) = &config.gemini {
2389                    stream_gemini(
2390                        &c.api_key,
2391                        c.base_url
2392                            .as_deref()
2393                            .unwrap_or("https://generativelanguage.googleapis.com"),
2394                        &c.model,
2395                        prompt,
2396                        tool_schemas.as_ref().map(|s| s.gemini.as_slice()),
2397                        tx,
2398                        cancel_token,
2399                    )
2400                    .await
2401                } else {
2402                    stream_unconfigured_error("gemini", tx).await
2403                }
2404            }
2405            "ollama" => {
2406                if let Some(c) = &config.ollama {
2407                    stream_ollama(
2408                        &c.base_url,
2409                        &c.model,
2410                        prompt,
2411                        tool_schemas.as_ref().map(|s| s.openai.as_slice()),
2412                        tx,
2413                        cancel_token,
2414                    )
2415                    .await
2416                } else {
2417                    stream_unconfigured_error("ollama", tx).await
2418                }
2419            }
2420            other => stream_unconfigured_error(other, tx).await,
2421        }
2422    }
2423    .instrument(tracing::info_span!(
2424        "agent.streaming.request",
2425        provider = %config.primary,
2426        has_tool_schemas = tool_schemas.is_some()
2427    ))
2428    .await
2429}
2430
2431/// Start streaming with fallback to secondary provider on failure
2432/// Implements jittered exponential backoff with rate-limit-aware delay selection before falling back.
2433pub async fn start_streaming_with_fallback(
2434    config: &StreamingConfig,
2435    prompt: &str,
2436    tool_schemas: Option<ProviderToolSchemas>,
2437    tx: mpsc::Sender<StreamChunk>,
2438    cancel_token: CancellationToken,
2439) -> Result<(), AppError> {
2440    // Try primary provider with retries
2441    let retry_policy = RetryPolicy::for_streaming();
2442    let total_attempts = retry_policy.max_attempts.max(1) as usize;
2443    let mut last_error: Option<AppError> = None;
2444    let mut skipped_retries_due_to_unconfigured = false;
2445
2446    for attempt in 0..total_attempts {
2447        if cancel_token.is_cancelled() {
2448            let _ = tx.send(cancel_token.interruption_chunk()).await;
2449            return Ok(());
2450        }
2451
2452        // Create a new channel for this attempt
2453        let (attempt_tx, mut attempt_rx) =
2454            mpsc::channel::<StreamChunk>(STREAM_CHUNK_BUFFER_CAPACITY);
2455        let attempt_cancel = cancel_token.clone();
2456        let config_clone = config.clone();
2457        let prompt_clone = prompt.to_string();
2458        let tool_schemas_clone = tool_schemas.clone();
2459
2460        // Spawn the streaming attempt
2461        let attempt_span = tracing::info_span!(
2462            "agent.streaming.fallback_attempt",
2463            attempt = attempt + 1,
2464            total_attempts = total_attempts
2465        );
2466        let handle = tokio::spawn(
2467            async move {
2468                start_streaming(
2469                    &config_clone,
2470                    &prompt_clone,
2471                    tool_schemas_clone,
2472                    attempt_tx,
2473                    attempt_cancel,
2474                )
2475                .await
2476            }
2477            .instrument(attempt_span),
2478        );
2479
2480        // Forward chunks to the caller in real-time.
2481        // If the attempt fails before producing any output, we can retry.
2482        let forward = forward_attempt_stream(&mut attempt_rx, &tx).await;
2483
2484        // Wait for the task to complete (capture errors that might occur before any chunk arrives).
2485        match handle.await {
2486            Ok(Ok(())) => {}
2487            Ok(Err(e)) => {
2488                last_error = Some(e);
2489            }
2490            Err(e) => {
2491                last_error = Some(AppError::Llm(format!("Task failed: {}", e)));
2492            }
2493        }
2494
2495        match forward.outcome {
2496            AttemptOutcome::Success => return Ok(()),
2497            AttemptOutcome::Cancelled | AttemptOutcome::Paused => return Ok(()),
2498            AttemptOutcome::FatalError => {
2499                let err = AppError::Llm(
2500                    forward
2501                        .error
2502                        .clone()
2503                        .unwrap_or_else(|| "Streaming failed".to_string()),
2504                );
2505                return Err(err);
2506            }
2507            AttemptOutcome::ContextOverflowError => {
2508                // Context overflow cannot be fixed by retry - caller must compact context.
2509                // Return immediately with a specific error so the pipeline can handle it.
2510                let error_msg = forward
2511                    .error
2512                    .clone()
2513                    .unwrap_or_else(|| "Context length exceeded".to_string());
2514
2515                tracing::warn!(
2516                    error = %error_msg,
2517                    "Context overflow detected - returning to pipeline for compaction"
2518                );
2519
2520                // Emit a special chunk so the pipeline knows to compact
2521                let _ = tx
2522                    .send(StreamChunk::ContextOverflow {
2523                        error_message: error_msg.clone(),
2524                    })
2525                    .await;
2526
2527                return Err(AppError::ContextOverflow(error_msg));
2528            }
2529            AttemptOutcome::RetryableError => {
2530                if let Some(ref e) = forward.error {
2531                    last_error = Some(AppError::Llm(e.clone()));
2532                }
2533            }
2534            AttemptOutcome::UnexpectedEnd => {
2535                if forward.forwarded_output {
2536                    // We streamed partial output but never got a terminal event; treat as fatal.
2537                    let err = AppError::Llm(
2538                        "Streaming ended unexpectedly (no terminal event received)".to_string(),
2539                    );
2540                    let _ = tx.send(StreamChunk::Error(err.to_string())).await;
2541                    return Err(err);
2542                }
2543                // Otherwise, allow retry (error may be captured from handle.await above).
2544            }
2545        }
2546
2547        // If the provider is simply not configured, retries won't help.
2548        // Skip backoff and jump directly to fallback (if configured).
2549        let unconfigured = forward
2550            .error
2551            .as_deref()
2552            .map(is_unconfigured_provider_message)
2553            .unwrap_or(false)
2554            || last_error
2555                .as_ref()
2556                .map(is_unconfigured_provider_error)
2557                .unwrap_or(false);
2558
2559        if unconfigured {
2560            skipped_retries_due_to_unconfigured = true;
2561            break;
2562        }
2563
2564        // Context overflow errors require compaction, not blind retries.
2565        // Return immediately so the pipeline can compact and retry.
2566        let is_context_overflow = forward
2567            .error
2568            .as_deref()
2569            .map(is_context_overflow_message)
2570            .unwrap_or(false)
2571            || matches!(&last_error, Some(AppError::ContextOverflow(_)));
2572
2573        if is_context_overflow {
2574            let error_msg = forward
2575                .error
2576                .clone()
2577                .or_else(|| last_error.as_ref().map(|e| e.to_string()))
2578                .unwrap_or_else(|| "Context length exceeded".to_string());
2579
2580            tracing::warn!(
2581                error = %error_msg,
2582                "Context overflow detected - skipping retries, returning for compaction"
2583            );
2584
2585            // Emit context overflow chunk so UI knows what's happening
2586            let _ = tx
2587                .send(StreamChunk::ContextOverflow {
2588                    error_message: error_msg.clone(),
2589                })
2590                .await;
2591
2592            return Err(AppError::ContextOverflow(error_msg));
2593        }
2594
2595        // Only back off if we will actually perform another attempt.
2596        if attempt + 1 < total_attempts {
2597            // Log retry attempt and notify frontend
2598            let error_msg = last_error
2599                .as_ref()
2600                .map(|e| e.to_string())
2601                .unwrap_or_else(|| "Unknown error".to_string());
2602            let retry_delay =
2603                select_streaming_retry_delay(&retry_policy, attempt as u32 + 1, &error_msg);
2604
2605            tracing::warn!(
2606                attempt = attempt + 1,
2607                delay_ms = retry_delay.as_millis(),
2608                error = %error_msg,
2609                "Primary LLM failed, retrying after backoff"
2610            );
2611
2612            // Emit retry notification to frontend
2613            let _ = tx
2614                .send(StreamChunk::RetryAttempt {
2615                    attempt: attempt as u32 + 1,
2616                    max_attempts: total_attempts as u32,
2617                    delay_ms: retry_delay.as_millis() as u64,
2618                    error_message: error_msg,
2619                })
2620                .await;
2621
2622            tokio::time::sleep(retry_delay).await;
2623        }
2624    }
2625
2626    // Primary failed after retries, try fallback if configured
2627    if let Some(ref fallback_provider) = config.fallback {
2628        if skipped_retries_due_to_unconfigured {
2629            tracing::info!(
2630                fallback = fallback_provider,
2631                "Primary LLM is not configured, trying fallback provider"
2632            );
2633        } else {
2634            tracing::info!(
2635                fallback = fallback_provider,
2636                "Primary LLM exhausted retries, trying fallback provider"
2637            );
2638        }
2639
2640        // Create a modified config with fallback as primary
2641        let mut fallback_config = config.clone();
2642        fallback_config.primary = fallback_provider.clone();
2643
2644        // Try fallback provider (no retries for fallback)
2645        let result = start_streaming(
2646            &fallback_config,
2647            prompt,
2648            tool_schemas,
2649            tx.clone(),
2650            cancel_token,
2651        )
2652        .await;
2653
2654        if result.is_ok() {
2655            return Ok(());
2656        }
2657
2658        tracing::error!("Fallback provider also failed");
2659    }
2660
2661    // All attempts failed
2662    if let Some(error) = last_error {
2663        let _ = tx.send(StreamChunk::Error(error.to_string())).await;
2664        Err(error)
2665    } else {
2666        let err = AppError::Llm("All LLM providers failed".to_string());
2667        let _ = tx.send(StreamChunk::Error(err.to_string())).await;
2668        Err(err)
2669    }
2670}
2671
2672#[cfg(test)]
2673mod tests {
2674    use super::*;
2675
2676    #[test]
2677    fn openai_http_error_includes_retry_after_hint_when_present() {
2678        let message = format_openai_http_error(
2679            reqwest::StatusCode::TOO_MANY_REQUESTS,
2680            "OpenAI",
2681            "gpt-5.4",
2682            OpenAiApi::Responses,
2683            "rate limit reached",
2684            Some(Duration::from_secs(12)),
2685        );
2686
2687        assert!(message.contains("HTTP 429"));
2688        assert!(message.contains("retrying after 12 seconds"));
2689    }
2690
2691    #[test]
2692    fn retry_delay_prefers_provider_retry_after_hint() {
2693        let policy = RetryPolicy {
2694            max_attempts: 3,
2695            initial_delay_ms: 1_000,
2696            max_delay_ms: 8_000,
2697            backoff_multiplier: 2.0,
2698            jitter_factor: 0.0,
2699        };
2700
2701        let delay = select_streaming_retry_delay(
2702            &policy,
2703            1,
2704            "OpenAI /v1/responses HTTP 429: rate limit reached. Provider suggested retrying after 12 seconds.",
2705        );
2706
2707        assert_eq!(delay, Duration::from_secs(12));
2708    }
2709
2710    #[test]
2711    fn retry_delay_uses_rate_limit_floor_without_retry_after_hint() {
2712        let policy = RetryPolicy {
2713            max_attempts: 3,
2714            initial_delay_ms: 1_000,
2715            max_delay_ms: 8_000,
2716            backoff_multiplier: 2.0,
2717            jitter_factor: 0.0,
2718        };
2719
2720        let delay = select_streaming_retry_delay(
2721            &policy,
2722            1,
2723            "OpenAI /v1/responses HTTP 429: Too many requests",
2724        );
2725
2726        assert_eq!(delay, Duration::from_secs(5));
2727    }
2728
2729    #[test]
2730    fn test_cancellation_token() {
2731        let token = CancellationToken::new();
2732        assert!(!token.is_cancelled());
2733        token.cancel();
2734        assert!(token.is_cancelled());
2735        assert!(!token.is_pause_requested());
2736        assert!(matches!(token.interruption_chunk(), StreamChunk::Cancelled));
2737    }
2738
2739    #[test]
2740    fn test_cancellation_token_pause_intent() {
2741        let token = CancellationToken::new();
2742        token.pause();
2743
2744        assert!(token.is_cancelled());
2745        assert!(token.is_pause_requested());
2746        assert!(matches!(token.interruption_chunk(), StreamChunk::Paused));
2747
2748        token.cancel();
2749        assert!(token.is_cancelled());
2750        assert!(!token.is_pause_requested());
2751        assert!(matches!(token.interruption_chunk(), StreamChunk::Cancelled));
2752    }
2753
2754    #[test]
2755    fn split_think_blocks_extracts_thinking() {
2756        let input = "<think>plan</think>answer";
2757        let (content, thinking) = split_think_blocks(input);
2758        assert_eq!(content, "answer");
2759        assert_eq!(thinking.as_deref(), Some("plan"));
2760    }
2761
2762    #[test]
2763    fn thinking_parser_handles_complete_tags() {
2764        let mut parser = ThinkingParser::new();
2765        let chunks = parser.process("<think>thinking content</think>response text");
2766
2767        assert_eq!(chunks.len(), 2);
2768        assert!(matches!(&chunks[0], StreamChunk::Thinking(t) if t == "thinking content"));
2769        assert!(matches!(&chunks[1], StreamChunk::Text(t) if t == "response text"));
2770    }
2771
2772    #[test]
2773    fn thinking_parser_handles_split_start_tag() {
2774        let mut parser = ThinkingParser::new();
2775
2776        // First chunk ends with partial "<think>"
2777        let chunks1 = parser.process("Hello <thi");
2778        assert_eq!(chunks1.len(), 1);
2779        assert!(matches!(&chunks1[0], StreamChunk::Text(t) if t == "Hello "));
2780
2781        // Second chunk completes the tag
2782        let chunks2 = parser.process("nk>thinking</think>done");
2783        assert_eq!(chunks2.len(), 2);
2784        assert!(matches!(&chunks2[0], StreamChunk::Thinking(t) if t == "thinking"));
2785        assert!(matches!(&chunks2[1], StreamChunk::Text(t) if t == "done"));
2786    }
2787
2788    #[test]
2789    fn thinking_parser_handles_split_end_tag() {
2790        let mut parser = ThinkingParser::new();
2791
2792        // First chunk has start tag and partial end tag
2793        let chunks1 = parser.process("<think>thinking content</th");
2794        assert_eq!(chunks1.len(), 1);
2795        assert!(matches!(&chunks1[0], StreamChunk::Thinking(t) if t == "thinking content"));
2796
2797        // Second chunk completes the end tag
2798        let chunks2 = parser.process("ink>response");
2799        assert_eq!(chunks2.len(), 1);
2800        assert!(matches!(&chunks2[0], StreamChunk::Text(t) if t == "response"));
2801    }
2802
2803    #[test]
2804    fn thinking_parser_handles_text_before_think() {
2805        let mut parser = ThinkingParser::new();
2806        let chunks = parser.process("prefix<think>thought</think>suffix");
2807
2808        assert_eq!(chunks.len(), 3);
2809        assert!(matches!(&chunks[0], StreamChunk::Text(t) if t == "prefix"));
2810        assert!(matches!(&chunks[1], StreamChunk::Thinking(t) if t == "thought"));
2811        assert!(matches!(&chunks[2], StreamChunk::Text(t) if t == "suffix"));
2812    }
2813
2814    #[test]
2815    fn thinking_parser_handles_no_think_tags() {
2816        let mut parser = ThinkingParser::new();
2817        let chunks = parser.process("just regular text");
2818
2819        assert_eq!(chunks.len(), 1);
2820        assert!(matches!(&chunks[0], StreamChunk::Text(t) if t == "just regular text"));
2821    }
2822
2823    #[test]
2824    fn openai_body_includes_tools_and_tool_choice_when_provided() {
2825        let tools = vec![serde_json::json!({
2826            "type": "function",
2827            "function": {
2828                "name": "shell",
2829                "description": "Run a command",
2830                "parameters": {"type": "object", "properties": {}}
2831            }
2832        })];
2833
2834        let body = build_openai_chat_request_body("gpt-test", "hi", Some(&tools));
2835        assert!(body.get("tools").is_some());
2836        assert_eq!(
2837            body.get("tool_choice").and_then(|v| v.as_str()),
2838            Some("auto")
2839        );
2840    }
2841
2842    #[test]
2843    fn openai_body_omits_tools_when_none() {
2844        let body = build_openai_chat_request_body("gpt-test", "hi", None);
2845        assert!(body.get("tools").is_none());
2846        assert!(body.get("tool_choice").is_none());
2847    }
2848
2849    #[test]
2850    fn openai_body_omits_temperature() {
2851        let body = build_openai_chat_request_body("gpt-test", "hi", None);
2852        assert!(body.get("temperature").is_none());
2853    }
2854
2855    #[test]
2856    fn openai_responses_body_uses_responses_shape() {
2857        let tools = vec![serde_json::json!({
2858            "type": "function",
2859            "name": "shell",
2860            "description": "Run a command",
2861            "parameters": {"type": "object", "properties": {}}
2862        })];
2863
2864        let body = build_openai_responses_request_body("gpt-5.4", "hi", Some(&tools));
2865        assert_eq!(body["model"], "gpt-5.4");
2866        assert_eq!(body["input"][0]["role"], "user");
2867        assert_eq!(body["input"][0]["content"], "hi");
2868        assert!(body.get("tools").is_some());
2869        assert_eq!(body["tool_choice"], "auto");
2870    }
2871
2872    #[test]
2873    fn openai_http_error_mentions_selected_endpoint() {
2874        let message = format_openai_http_error(
2875            reqwest::StatusCode::NOT_FOUND,
2876            "OpenAI",
2877            "gpt-5.3-codex",
2878            OpenAiApi::ChatCompletions,
2879            "This is not a chat model",
2880            None,
2881        );
2882        assert!(message.contains("/v1/responses"));
2883        assert!(message.contains("/v1/chat/completions"));
2884    }
2885
2886    #[test]
2887    fn openai_tool_call_deltas_are_assembled_by_index() {
2888        let mut pending = BTreeMap::new();
2889
2890        merge_openai_tool_call_delta(
2891            &mut pending,
2892            &serde_json::json!({
2893                "index": 0,
2894                "id": "call_0",
2895                "function": {"name": "task", "arguments": "{\"operation\":\"update_status\",\"task_id\":\"abc"}
2896            }),
2897            0,
2898        );
2899        merge_openai_tool_call_delta(
2900            &mut pending,
2901            &serde_json::json!({
2902                "index": 1,
2903                "id": "call_1",
2904                "function": {"name": "shell", "arguments": "{\"command\":\"cargo check"}
2905            }),
2906            1,
2907        );
2908        merge_openai_tool_call_delta(
2909            &mut pending,
2910            &serde_json::json!({
2911                "index": 0,
2912                "function": {"arguments": "\",\"status\":\"completed\"}"}
2913            }),
2914            0,
2915        );
2916        merge_openai_tool_call_delta(
2917            &mut pending,
2918            &serde_json::json!({
2919                "index": 1,
2920                "function": {"arguments": "\",\"timeout_secs\":300}"}
2921            }),
2922            1,
2923        );
2924
2925        let calls = take_openai_tool_calls(&mut pending);
2926        assert_eq!(calls.len(), 2);
2927        assert_eq!(calls[0].0, 0);
2928        assert_eq!(calls[0].1.id, "call_0");
2929        assert_eq!(calls[0].1.name, "task");
2930        assert_eq!(
2931            calls[0].1.arguments,
2932            "{\"operation\":\"update_status\",\"task_id\":\"abc\",\"status\":\"completed\"}"
2933        );
2934        assert_eq!(calls[1].0, 1);
2935        assert_eq!(calls[1].1.id, "call_1");
2936        assert_eq!(calls[1].1.name, "shell");
2937        assert_eq!(
2938            calls[1].1.arguments,
2939            "{\"command\":\"cargo check\",\"timeout_secs\":300}"
2940        );
2941        assert!(pending.is_empty());
2942    }
2943
2944    #[tokio::test]
2945    async fn emit_openai_tool_calls_streams_complete_calls_in_index_order() {
2946        let (tx, mut rx) = mpsc::channel(10);
2947        let mut pending = BTreeMap::new();
2948        pending.insert(
2949            1,
2950            PendingOpenAiToolCall {
2951                id: "call_1".to_string(),
2952                name: "shell".to_string(),
2953                arguments: "{\"command\":\"pwd\"}".to_string(),
2954            },
2955        );
2956        pending.insert(
2957            0,
2958            PendingOpenAiToolCall {
2959                id: "call_0".to_string(),
2960                name: "file".to_string(),
2961                arguments: "{\"operation\":\"list\"}".to_string(),
2962            },
2963        );
2964
2965        emit_openai_tool_calls(&tx, &mut pending).await;
2966
2967        assert!(matches!(
2968            rx.recv().await,
2969            Some(StreamChunk::ToolCallStart { id, name }) if id == "call_0" && name == "file"
2970        ));
2971        assert!(matches!(
2972            rx.recv().await,
2973            Some(StreamChunk::ToolCallArgs(args)) if args == "{\"operation\":\"list\"}"
2974        ));
2975        assert!(matches!(rx.recv().await, Some(StreamChunk::ToolCallEnd)));
2976        assert!(matches!(
2977            rx.recv().await,
2978            Some(StreamChunk::ToolCallStart { id, name }) if id == "call_1" && name == "shell"
2979        ));
2980        assert!(matches!(
2981            rx.recv().await,
2982            Some(StreamChunk::ToolCallArgs(args)) if args == "{\"command\":\"pwd\"}"
2983        ));
2984        assert!(matches!(rx.recv().await, Some(StreamChunk::ToolCallEnd)));
2985        assert!(pending.is_empty());
2986    }
2987
2988    #[test]
2989    fn openai_responses_tool_calls_are_buffered_by_output_index() {
2990        let mut pending = BTreeMap::new();
2991        let mut tool_indices = HashMap::new();
2992
2993        merge_openai_responses_tool_item(
2994            &mut pending,
2995            &mut tool_indices,
2996            &serde_json::json!({
2997                "type": "response.output_item.added",
2998                "output_index": 0,
2999                "item": {
3000                    "type": "function_call",
3001                    "id": "fc_0",
3002                    "call_id": "call_0",
3003                    "name": "file"
3004                }
3005            }),
3006            0,
3007        );
3008        merge_openai_responses_tool_argument_delta(
3009            &mut pending,
3010            &mut tool_indices,
3011            &serde_json::json!({
3012                "type": "response.function_call_arguments.delta",
3013                "output_index": 0,
3014                "item_id": "fc_0",
3015                "delta": "{\"operation\":\"list\"}"
3016            }),
3017            0,
3018        );
3019        complete_openai_responses_tool_arguments(
3020            &mut pending,
3021            &mut tool_indices,
3022            &serde_json::json!({
3023                "type": "response.function_call_arguments.done",
3024                "output_index": 0,
3025                "item_id": "fc_0",
3026                "arguments": "{\"operation\":\"list\"}"
3027            }),
3028            0,
3029        );
3030
3031        assert_eq!(pending.len(), 1);
3032        assert_eq!(pending[&0].id, "call_0");
3033        assert_eq!(pending[&0].name, "file");
3034        assert_eq!(pending[&0].arguments, "{\"operation\":\"list\"}");
3035        assert!(pending[&0].finished);
3036    }
3037
3038    #[test]
3039    fn openai_responses_tool_calls_reuse_stable_aliases_when_output_index_is_missing() {
3040        let mut pending = BTreeMap::new();
3041        let mut tool_indices = HashMap::new();
3042
3043        merge_openai_responses_tool_item(
3044            &mut pending,
3045            &mut tool_indices,
3046            &serde_json::json!({
3047                "type": "response.output_item.added",
3048                "item": {
3049                    "type": "function_call",
3050                    "id": "fc_0",
3051                    "call_id": "call_0",
3052                    "name": "file"
3053                }
3054            }),
3055            3,
3056        );
3057        merge_openai_responses_tool_argument_delta(
3058            &mut pending,
3059            &mut tool_indices,
3060            &serde_json::json!({
3061                "type": "response.function_call_arguments.delta",
3062                "item_id": "fc_0",
3063                "delta": "{\"operation\":\"list\"}"
3064            }),
3065            8,
3066        );
3067        complete_openai_responses_tool_arguments(
3068            &mut pending,
3069            &mut tool_indices,
3070            &serde_json::json!({
3071                "type": "response.function_call_arguments.done",
3072                "call_id": "call_0",
3073                "arguments": "{\"operation\":\"list\"}"
3074            }),
3075            13,
3076        );
3077
3078        assert_eq!(pending.len(), 1);
3079        assert_eq!(pending[&3].id, "call_0");
3080        assert_eq!(pending[&3].arguments, "{\"operation\":\"list\"}");
3081        assert!(pending[&3].finished);
3082    }
3083
3084    #[tokio::test]
3085    async fn emit_openai_responses_tool_calls_waits_for_lowest_ready_index() {
3086        let (tx, mut rx) = mpsc::channel(10);
3087        let mut pending = BTreeMap::new();
3088        let mut emitted_ids = HashSet::new();
3089        pending.insert(
3090            0,
3091            PendingOpenAiResponsesToolCall {
3092                id: "call_0".to_string(),
3093                name: "file".to_string(),
3094                arguments: "{\"operation\":\"list\"}".to_string(),
3095                finished: true,
3096            },
3097        );
3098        pending.insert(
3099            1,
3100            PendingOpenAiResponsesToolCall {
3101                id: "call_1".to_string(),
3102                name: "shell".to_string(),
3103                arguments: "{\"command\":\"pwd\"}".to_string(),
3104                finished: true,
3105            },
3106        );
3107
3108        emit_ready_openai_responses_tool_calls(&tx, &mut pending, &mut emitted_ids, false).await;
3109
3110        assert!(matches!(
3111            rx.recv().await,
3112            Some(StreamChunk::ToolCallStart { id, name }) if id == "call_0" && name == "file"
3113        ));
3114        assert!(matches!(
3115            rx.recv().await,
3116            Some(StreamChunk::ToolCallArgs(args)) if args == "{\"operation\":\"list\"}"
3117        ));
3118        assert!(matches!(rx.recv().await, Some(StreamChunk::ToolCallEnd)));
3119        assert!(matches!(
3120            rx.recv().await,
3121            Some(StreamChunk::ToolCallStart { id, name }) if id == "call_1" && name == "shell"
3122        ));
3123        assert!(matches!(
3124            rx.recv().await,
3125            Some(StreamChunk::ToolCallArgs(args)) if args == "{\"command\":\"pwd\"}"
3126        ));
3127        assert!(matches!(rx.recv().await, Some(StreamChunk::ToolCallEnd)));
3128        assert!(pending.is_empty());
3129    }
3130
3131    #[tokio::test]
3132    async fn emit_openai_responses_tool_calls_skips_duplicate_call_ids() {
3133        let (tx, mut rx) = mpsc::channel(10);
3134        let mut pending = BTreeMap::new();
3135        let mut emitted_ids = HashSet::new();
3136        pending.insert(
3137            0,
3138            PendingOpenAiResponsesToolCall {
3139                id: "call_dup".to_string(),
3140                name: "file".to_string(),
3141                arguments: "{\"operation\":\"list\"}".to_string(),
3142                finished: true,
3143            },
3144        );
3145        pending.insert(
3146            1,
3147            PendingOpenAiResponsesToolCall {
3148                id: "call_dup".to_string(),
3149                name: "file".to_string(),
3150                arguments: "{\"operation\":\"list\"}".to_string(),
3151                finished: true,
3152            },
3153        );
3154
3155        emit_ready_openai_responses_tool_calls(&tx, &mut pending, &mut emitted_ids, false).await;
3156
3157        assert!(matches!(
3158            rx.recv().await,
3159            Some(StreamChunk::ToolCallStart { id, name }) if id == "call_dup" && name == "file"
3160        ));
3161        assert!(matches!(
3162            rx.recv().await,
3163            Some(StreamChunk::ToolCallArgs(args)) if args == "{\"operation\":\"list\"}"
3164        ));
3165        assert!(matches!(rx.recv().await, Some(StreamChunk::ToolCallEnd)));
3166        assert!(rx.try_recv().is_err());
3167        assert!(pending.is_empty());
3168    }
3169
3170    #[test]
3171    fn anthropic_body_includes_tools_when_provided() {
3172        let tools = vec![serde_json::json!({
3173            "name": "shell",
3174            "description": "Run a command",
3175            "input_schema": {"type": "object", "properties": {}}
3176        })];
3177
3178        let body = build_anthropic_messages_body("claude-test", "hi", None, Some(&tools));
3179        assert!(body.get("tools").is_some());
3180    }
3181
3182    #[tokio::test]
3183    async fn test_stream_chunk_types() {
3184        let (tx, mut rx) = mpsc::channel(10);
3185
3186        tx.send(StreamChunk::Text("Hello".to_string()))
3187            .await
3188            .unwrap();
3189        tx.send(StreamChunk::Done(None)).await.unwrap();
3190
3191        if let Some(StreamChunk::Text(text)) = rx.recv().await {
3192            assert_eq!(text, "Hello");
3193        } else {
3194            panic!("Expected Text chunk");
3195        }
3196
3197        if let Some(StreamChunk::Done(_)) = rx.recv().await {
3198            // OK
3199        } else {
3200            panic!("Expected Done chunk");
3201        }
3202    }
3203
3204    #[tokio::test]
3205    async fn start_streaming_unconfigured_provider_returns_error() {
3206        let cfg = StreamingConfig {
3207            primary: "openai".to_string(),
3208            openai: None,
3209            ..Default::default()
3210        };
3211
3212        let (tx, mut rx) = mpsc::channel(128);
3213        let cancel = CancellationToken::new();
3214
3215        tokio::spawn(async move {
3216            let prompt = "hello world";
3217            let _ = start_streaming(&cfg, prompt, None, tx, cancel).await;
3218        });
3219
3220        // First chunk should be a Status message (does not count as output for retry).
3221        match rx.recv().await {
3222            Some(StreamChunk::Status { message }) => {
3223                assert!(message.contains("not configured"));
3224            }
3225            other => panic!("Expected Status chunk, got: {other:?}"),
3226        }
3227
3228        // Next chunk should be an Error.
3229        match rx.recv().await {
3230            Some(StreamChunk::Error(msg)) => {
3231                assert!(msg.contains("not configured"));
3232            }
3233            other => panic!("Expected Error chunk, got: {other:?}"),
3234        }
3235    }
3236
3237    #[tokio::test]
3238    async fn forward_attempt_stream_forwards_immediately() {
3239        let (outer_tx, mut outer_rx) = mpsc::channel::<StreamChunk>(10);
3240        let (attempt_tx, mut attempt_rx) = mpsc::channel::<StreamChunk>(10);
3241
3242        let forward_handle =
3243            tokio::spawn(async move { forward_attempt_stream(&mut attempt_rx, &outer_tx).await });
3244
3245        attempt_tx
3246            .send(StreamChunk::Text("A".to_string()))
3247            .await
3248            .unwrap();
3249
3250        // If forwarding is real-time, we should observe the text chunk before we send Done.
3251        match outer_rx.recv().await {
3252            Some(StreamChunk::Text(t)) => assert_eq!(t, "A"),
3253            other => panic!("Expected Text chunk, got: {other:?}"),
3254        }
3255
3256        attempt_tx.send(StreamChunk::Done(None)).await.unwrap();
3257        match outer_rx.recv().await {
3258            Some(StreamChunk::Done(_)) => {}
3259            other => panic!("Expected Done chunk, got: {other:?}"),
3260        }
3261
3262        let result = forward_handle.await.unwrap();
3263        assert_eq!(result.outcome, AttemptOutcome::Success);
3264    }
3265
3266    #[tokio::test]
3267    async fn forward_attempt_stream_retryable_error_before_output_is_not_forwarded() {
3268        let (outer_tx, mut outer_rx) = mpsc::channel::<StreamChunk>(10);
3269        let (attempt_tx, mut attempt_rx) = mpsc::channel::<StreamChunk>(10);
3270
3271        let forward_handle =
3272            tokio::spawn(async move { forward_attempt_stream(&mut attempt_rx, &outer_tx).await });
3273
3274        attempt_tx
3275            .send(StreamChunk::Error("nope".to_string()))
3276            .await
3277            .unwrap();
3278
3279        // Should not forward any chunk if no output has been streamed (enables clean retries).
3280        // The receiver may either time out (no activity) or complete with `None` if the sender is dropped.
3281        let recv =
3282            tokio::time::timeout(std::time::Duration::from_millis(50), outer_rx.recv()).await;
3283        match recv {
3284            Err(_) => {}   // no activity
3285            Ok(None) => {} // sender dropped without sending anything
3286            Ok(Some(other)) => panic!("did not expect any forwarded chunk, got: {other:?}"),
3287        }
3288
3289        let result = forward_handle.await.unwrap();
3290        assert_eq!(result.outcome, AttemptOutcome::RetryableError);
3291    }
3292
3293    #[tokio::test]
3294    async fn forward_attempt_stream_fatal_error_after_output_is_forwarded() {
3295        let (outer_tx, mut outer_rx) = mpsc::channel::<StreamChunk>(10);
3296        let (attempt_tx, mut attempt_rx) = mpsc::channel::<StreamChunk>(10);
3297
3298        let forward_handle =
3299            tokio::spawn(async move { forward_attempt_stream(&mut attempt_rx, &outer_tx).await });
3300
3301        attempt_tx
3302            .send(StreamChunk::Text("hello".to_string()))
3303            .await
3304            .unwrap();
3305        match outer_rx.recv().await {
3306            Some(StreamChunk::Text(t)) => assert_eq!(t, "hello"),
3307            other => panic!("Expected Text chunk, got: {other:?}"),
3308        }
3309
3310        attempt_tx
3311            .send(StreamChunk::Error("boom".to_string()))
3312            .await
3313            .unwrap();
3314        match outer_rx.recv().await {
3315            Some(StreamChunk::Error(e)) => assert_eq!(e, "boom"),
3316            other => panic!("Expected Error chunk, got: {other:?}"),
3317        }
3318
3319        let result = forward_handle.await.unwrap();
3320        assert_eq!(result.outcome, AttemptOutcome::FatalError);
3321    }
3322
3323    #[tokio::test]
3324    async fn forward_attempt_stream_drops_status_under_backpressure_without_blocking_retry() {
3325        let (outer_tx, mut outer_rx) = mpsc::channel::<StreamChunk>(1);
3326        outer_tx
3327            .send(StreamChunk::Status {
3328                message: "occupied".to_string(),
3329            })
3330            .await
3331            .unwrap();
3332
3333        let (attempt_tx, mut attempt_rx) = mpsc::channel::<StreamChunk>(10);
3334        let forward_handle =
3335            tokio::spawn(async move { forward_attempt_stream(&mut attempt_rx, &outer_tx).await });
3336
3337        attempt_tx
3338            .send(StreamChunk::Status {
3339                message: "keepalive".to_string(),
3340            })
3341            .await
3342            .unwrap();
3343        attempt_tx
3344            .send(StreamChunk::Error("retry me".to_string()))
3345            .await
3346            .unwrap();
3347        drop(attempt_tx);
3348
3349        let result = tokio::time::timeout(Duration::from_millis(300), forward_handle)
3350            .await
3351            .expect("status backpressure should not stall forwarder")
3352            .expect("forwarder join should succeed");
3353
3354        assert_eq!(result.outcome, AttemptOutcome::RetryableError);
3355        assert!(!result.forwarded_output);
3356        assert_eq!(result.error.as_deref(), Some("retry me"));
3357
3358        match outer_rx.recv().await {
3359            Some(StreamChunk::Status { message }) => assert_eq!(message, "occupied"),
3360            other => panic!("expected only the pre-filled status chunk, got: {other:?}"),
3361        }
3362
3363        let recv = tokio::time::timeout(Duration::from_millis(50), outer_rx.recv()).await;
3364        match recv {
3365            Err(_) => {}
3366            Ok(None) => {}
3367            Ok(Some(other)) => {
3368                panic!("did not expect forwarded status/error chunk, got: {other:?}")
3369            }
3370        }
3371    }
3372
3373    #[tokio::test]
3374    async fn forward_attempt_stream_drops_token_usage_under_backpressure_without_blocking_retry() {
3375        let (outer_tx, mut outer_rx) = mpsc::channel::<StreamChunk>(1);
3376        outer_tx
3377            .send(StreamChunk::TokenUsageUpdate {
3378                estimated: 42,
3379                limit: 100,
3380                percentage: 42,
3381                status: TokenUsageStatus::Green,
3382                estimated_cost: 0.0001,
3383            })
3384            .await
3385            .unwrap();
3386
3387        let (attempt_tx, mut attempt_rx) = mpsc::channel::<StreamChunk>(10);
3388        let forward_handle =
3389            tokio::spawn(async move { forward_attempt_stream(&mut attempt_rx, &outer_tx).await });
3390
3391        attempt_tx
3392            .send(StreamChunk::TokenUsageUpdate {
3393                estimated: 50,
3394                limit: 100,
3395                percentage: 50,
3396                status: TokenUsageStatus::Green,
3397                estimated_cost: 0.0002,
3398            })
3399            .await
3400            .unwrap();
3401        attempt_tx
3402            .send(StreamChunk::Error("retry me".to_string()))
3403            .await
3404            .unwrap();
3405        drop(attempt_tx);
3406
3407        let result = tokio::time::timeout(Duration::from_millis(300), forward_handle)
3408            .await
3409            .expect("token-usage backpressure should not stall forwarder")
3410            .expect("forwarder join should succeed");
3411
3412        assert_eq!(result.outcome, AttemptOutcome::RetryableError);
3413        assert!(!result.forwarded_output);
3414        assert_eq!(result.error.as_deref(), Some("retry me"));
3415
3416        match outer_rx.recv().await {
3417            Some(StreamChunk::TokenUsageUpdate { estimated, .. }) => assert_eq!(estimated, 42),
3418            other => panic!("expected only the pre-filled token-usage chunk, got: {other:?}"),
3419        }
3420
3421        let recv = tokio::time::timeout(Duration::from_millis(50), outer_rx.recv()).await;
3422        match recv {
3423            Err(_) => {}
3424            Ok(None) => {}
3425            Ok(Some(other)) => {
3426                panic!("did not expect forwarded token-usage/error chunk, got: {other:?}")
3427            }
3428        }
3429    }
3430}