gestura_core/pipeline/
mod.rs

1//! Agent Pipeline - Unified LLM interaction pipeline
2//!
3//! This module provides a single entry point for all LLM interactions,
4//! regardless of input source (text, voice, delegated tasks). It integrates:
5//!
6//! - Context analysis and reduction
7//! - Tool filtering based on request
8//! - Agentic loop for tool execution
9//! - Optional ERL-inspired reflection, retry, and memory consolidation
10//! - Streaming and non-streaming responses
11//! - Token estimation and truncation
12//! - Fallback to secondary providers
13//! - Workspace sandboxing for tool execution
14//!
15//! Internal organization notes:
16//! - `agent_loop` owns the runtime control flow and delegates shared
17//!   iteration/finalization helpers, tracked-task bookkeeping, narration /
18//!   status emission, and continuation/closeout logic to sidecar modules.
19//! - `tool_dispatch` owns tool execution; its test suite lives in a dedicated
20//!   sidecar module so the runtime file stays focused on behavior.
21
22mod agent_loop;
23mod compaction;
24mod prompt;
25mod reflection;
26mod request_telemetry;
27mod tool_dispatch;
28mod tool_router;
29pub mod types;
30pub(crate) use reflection::sync_task_reflection_outcomes;
31
32use std::collections::HashSet;
33use std::path::Path;
34use std::time::Instant;
35use tokio::sync::mpsc;
36pub use tool_router::{RoutingResult, ToolRouter, build_tool_router};
37use tracing::Instrument as _;
38
39use crate::agent_sessions::{AgentSessionStore, FileAgentSessionStore};
40use crate::checkpoints::{CheckpointManager, CheckpointRetentionPolicy, FileCheckpointStore};
41use crate::config::AppConfig;
42use crate::context::{ContextManager, RequestAnalyzer};
43use crate::error::AppError;
44use crate::hooks::{HookContext, HookEngine, HookEvent};
45use crate::knowledge::{KnowledgeSettingsManager, KnowledgeStore};
46use crate::llm_provider::{AgentContext, select_provider};
47use crate::session_workspace::SessionWorkspace;
48use crate::streaming::{
49    CancellationToken, StreamChunk, start_streaming, start_streaming_with_fallback,
50};
51use crate::tasks::TaskManager;
52use crate::tool_confirmation::TOOL_CONFIRMATIONS;
53use crate::tools::PermissionManager;
54use crate::tools::registry::{ToolDefinition, all_tools};
55use gestura_core_llm::model_capabilities::ModelCapabilitiesCache;
56
57use request_telemetry::{AgentRequestTelemetry, RequestOutcome, RequestRunMode};
58use tool_dispatch::{FinalizePendingToolCallCtx, PendingToolCall};
59pub use types::*;
60
61pub(super) const STREAM_CHUNK_BUFFER_CAPACITY: usize = 256;
62const REQUIREMENT_DETECTION_INPUT_HINT_KEY: &str = "requirement_detection_input";
63const INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY: &str = "internal.requirement_breakdown";
64const NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT: std::time::Duration =
65    std::time::Duration::from_millis(100);
66
67pub(super) async fn send_status_chunk_best_effort(
68    tx: &mpsc::Sender<StreamChunk>,
69    chunk: StreamChunk,
70) {
71    debug_assert!(matches!(chunk, StreamChunk::Status { .. }));
72
73    match tokio::time::timeout(NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
74        Ok(Ok(())) | Ok(Err(_)) => {}
75        Err(_) => {
76            tracing::debug!(
77                timeout_ms = NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT.as_millis(),
78                "Dropping transient status chunk because the stream receiver is not draining fast enough"
79            );
80        }
81    }
82}
83
84pub(super) async fn send_token_usage_chunk_best_effort(
85    tx: &mpsc::Sender<StreamChunk>,
86    chunk: StreamChunk,
87) {
88    debug_assert!(matches!(chunk, StreamChunk::TokenUsageUpdate { .. }));
89
90    match tokio::time::timeout(NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
91        Ok(Ok(())) | Ok(Err(_)) => {}
92        Err(_) => {
93            tracing::debug!(
94                timeout_ms = NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT.as_millis(),
95                "Dropping transient token-usage chunk because the stream receiver is not draining fast enough"
96            );
97        }
98    }
99}
100
101/// Minimum combined gesture confidence required to route a gesture through the
102/// agentic pipeline.
103///
104/// Combined confidence = device-classifier confidence × intent-mapping confidence.
105/// Examples at this threshold:
106/// - `tap` with device confidence 0.78 → `0.78 × 0.9 = 0.702` — just above gate ✓
107/// - `tilt_left` with device confidence 1.0 → `1.0 × 0.85 = 0.85` — well above ✓
108/// - any `unknown_gesture` → mapping confidence 0.5, always below gate ✗
109///
110/// Gestures below the threshold are logged at `DEBUG` level and discarded to
111/// prevent accidental or misclassified inputs from triggering LLM/tool activity.
112#[cfg(feature = "ring-integration")]
113const MIN_GESTURE_CONFIDENCE: f32 = 0.7;
114
115/// Capacity of the bounded mpsc channel between the gesture receiver loop and
116/// the single background pipeline processor. When full, incoming gestures are
117/// dropped with a `WARN` log rather than blocking the receiver (which would
118/// delay haptic acknowledgement and stall the BLE notification path).
119#[cfg(feature = "ring-integration")]
120const GESTURE_QUEUE_CAPACITY: usize = 32;
121
122/// Process a stream of gestures from the ring backend and route each one
123/// through the agentic pipeline.
124///
125/// Each gesture is normalised into a unified [`gestura_core_intent::Intent`]
126/// and then submitted to an [`AgentPipeline`] via [`AgentPipeline::process_blocking`].
127///
128/// ## Confidence gating
129///
130/// Gestures with combined confidence below [`MIN_GESTURE_CONFIDENCE`] or with
131/// `primary_action == "unknown_gesture"` are discarded before reaching the
132/// pipeline, preventing accidental inputs from triggering LLM or tool activity.
133///
134/// ## Ordered, bounded processing
135///
136/// A single background task drains a bounded [`tokio::sync::mpsc`] channel
137/// (capacity [`GESTURE_QUEUE_CAPACITY`]) so that:
138/// - Pipeline executions are **sequential** — haptic feedback is emitted in
139///   gesture arrival order.
140/// - **At most one** `process_blocking` call is in flight at any time.
141/// - The receiver loop is **never blocked** by a slow pipeline run; excess
142///   gestures are dropped with a warning instead of building an unbounded queue.
143///
144/// ## Haptic feedback
145///
146/// - **Success** → [`gestura_core_haptics::HapticPattern::Confirm`]
147/// - **Pipeline error** → [`gestura_core_haptics::HapticPattern::Error`]
148#[cfg(feature = "ring-integration")]
149pub async fn process_ring_stream(
150    backend: std::sync::Arc<dyn gestura_core_ring::RingBackend>,
151    observer: std::sync::Arc<dyn crate::orchestrator::OrchestratorObserver>,
152    config: AppConfig,
153) {
154    type Work = (
155        gestura_core_intent::Intent,
156        std::sync::Arc<dyn crate::orchestrator::OrchestratorObserver>,
157    );
158
159    let pipeline = std::sync::Arc::new(AgentPipeline::new(config));
160
161    // Bounded channel: receiver loop enqueues, single worker drains.
162    let (work_tx, mut work_rx) = tokio::sync::mpsc::channel::<Work>(GESTURE_QUEUE_CAPACITY);
163
164    // Spawn the single sequential processor. Dropping `work_tx` (when the
165    // receiver loop exits) closes the channel and cleanly terminates this task.
166    let pipeline_worker = pipeline.clone();
167    tokio::spawn(async move {
168        while let Some((intent, obs)) = work_rx.recv().await {
169            let request = AgentRequest::new(intent.primary_action.clone()).with_streaming(false);
170            match pipeline_worker.process_blocking(request).await {
171                Ok(_) => {
172                    obs.on_haptic_feedback(gestura_core_haptics::HapticPattern::Confirm, 1.0, 200)
173                        .await;
174                }
175                Err(e) => {
176                    tracing::warn!(
177                        error = %e,
178                        primary_action = %intent.primary_action,
179                        "Ring gesture failed to route through agentic pipeline"
180                    );
181                    obs.on_haptic_feedback(gestura_core_haptics::HapticPattern::Error, 0.5, 150)
182                        .await;
183                }
184            }
185        }
186    });
187
188    let mut rx = backend.subscribe_to_gestures().await;
189    loop {
190        match rx.recv().await {
191            Ok(gesture) => {
192                let raw_input = gestura_core_intent::RawInput {
193                    text: gesture.gesture_type.clone(),
194                    modality: gestura_core_intent::InputModality::Gesture,
195                    session_id: None,
196                    gesture_data: Some(gestura_core_intent::GestureData {
197                        gesture_type: gesture.gesture_type,
198                        acceleration: gesture.acceleration,
199                        gyroscope: gesture.gyroscope,
200                        confidence: gesture.confidence,
201                    }),
202                };
203
204                let intent = gestura_core_intent::normalize_input_to_intent(raw_input);
205
206                // Gate: discard low-confidence and unrecognised gestures before
207                // they reach the pipeline to avoid unnecessary LLM/tool runs.
208                if intent.confidence < MIN_GESTURE_CONFIDENCE
209                    || intent.primary_action == "unknown_gesture"
210                {
211                    tracing::debug!(
212                        intent_id = %intent.id,
213                        primary_action = %intent.primary_action,
214                        confidence = intent.confidence,
215                        "Skipping low-confidence or unrecognised gesture"
216                    );
217                    continue;
218                }
219
220                tracing::debug!(
221                    intent_id = %intent.id,
222                    primary_action = %intent.primary_action,
223                    confidence = intent.confidence,
224                    "Ring gesture normalised to intent; enqueuing for pipeline"
225                );
226
227                // Non-blocking enqueue. If the worker is behind we drop the
228                // gesture rather than blocking the BLE notification receiver.
229                if work_tx.try_send((intent, observer.clone())).is_err() {
230                    tracing::warn!(
231                        capacity = GESTURE_QUEUE_CAPACITY,
232                        "Gesture pipeline queue full; dropping gesture to prevent backlog"
233                    );
234                }
235            }
236            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
237                tracing::warn!(
238                    "Ring processing stream lagged, {} gestures dropped",
239                    skipped
240                );
241                continue;
242            }
243            Err(tokio::sync::broadcast::error::RecvError::Closed) => {
244                tracing::info!("Ring processing stream closed. Stopping task.");
245                break;
246            }
247        }
248    }
249}
250
251/// Select the correct tool schema slice for a provider name.
252///
253/// Each provider family has its own tool definition format:
254/// - Anthropic: `{name, description, input_schema}`
255/// - Gemini: `{name, description, parameters}` (for `functionDeclarations`)
256/// - OpenAI Chat Completions / Grok / Ollama: `{type:"function", function:{…}}`
257/// - OpenAI Responses: `{type:"function", name, description, parameters}`
258fn tools_slice_for_provider(
259    provider_name: &str,
260    model_id: Option<&str>,
261    schemas: &crate::tools::schemas::ProviderToolSchemas,
262) -> Vec<serde_json::Value> {
263    match provider_name {
264        "anthropic" => schemas.anthropic.clone(),
265        "gemini" => schemas.gemini.clone(),
266        "openai"
267            if model_id.is_some_and(|model| {
268                matches!(
269                    crate::llm_provider::openai::openai_api_for_model(model),
270                    crate::llm_provider::openai::OpenAiApi::Responses
271                )
272            }) =>
273        {
274            schemas.openai_responses.clone()
275        }
276        _ => schemas.openai.clone(),
277    }
278}
279
280fn requirement_detection_input(request: &AgentRequest) -> &str {
281    request
282        .metadata
283        .hints
284        .get(REQUIREMENT_DETECTION_INPUT_HINT_KEY)
285        .map(String::as_str)
286        .filter(|value| !value.trim().is_empty())
287        .unwrap_or(&request.input)
288}
289
290fn is_internal_requirement_breakdown_request(request: &AgentRequest) -> bool {
291    request
292        .metadata
293        .hints
294        .get(INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY)
295        .is_some_and(|value| value == "true")
296}
297
298/// The main agent pipeline for processing requests
299pub struct AgentPipeline {
300    /// Application configuration
301    config: AppConfig,
302    /// Context manager for smart context reduction
303    context_manager: ContextManager,
304    /// Request analyzer for category detection
305    analyzer: RequestAnalyzer,
306    /// Pipeline-specific configuration
307    pipeline_config: PipelineConfig,
308    /// Persistent permission manager used for tool confirmation decisions.
309    ///
310    /// This enables "Allow always" semantics for tool confirmations.
311    permission_manager: PermissionManager,
312    /// Knowledge store for specialized expertise
313    knowledge_store: Option<&'static KnowledgeStore>,
314    /// Knowledge settings manager for session-scoped activation
315    knowledge_settings: Option<&'static KnowledgeSettingsManager>,
316    /// Optional pre-flight LLM tool router (None when strategy is Keyword).
317    tool_router: Option<Box<dyn tool_router::ToolRouter>>,
318    /// Model capabilities cache for dynamic context limit discovery.
319    ///
320    /// This cache learns model limits from:
321    /// - API discovery (Gemini, Anthropic, Grok, Ollama)
322    /// - Error parsing (context_length_exceeded messages)
323    /// - User configuration overrides
324    capabilities_cache: ModelCapabilitiesCache,
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
328enum AutoTrackedRequestShape {
329    CreateOrModify,
330    InvestigateOrFix,
331    AnalyzeOrCompare,
332    ResearchOrFetch,
333    PlanOrDraft,
334    GeneralExecution,
335}
336
337impl AgentPipeline {
338    fn message_contains_any(text: &str, signals: &[&str]) -> bool {
339        signals.iter().any(|signal| text.contains(signal))
340    }
341
342    fn is_simple_verification_request(message: &str) -> bool {
343        let text = message.trim().to_ascii_lowercase();
344        if text.is_empty() {
345            return false;
346        }
347
348        let has_verification_signals = Self::message_contains_any(
349            &text,
350            &[
351                "build and test",
352                "run the build",
353                "run build",
354                "run the tests",
355                "run tests",
356                "run the test",
357                "run test",
358                "cargo build",
359                "cargo check",
360                "cargo test",
361                "verify",
362                "validation",
363                "validate",
364                "compile",
365                "lint",
366                "smoke test",
367            ],
368        );
369        if !has_verification_signals {
370            return false;
371        }
372
373        let has_non_verification_signals = Self::message_contains_any(
374            &text,
375            &[
376                "fix",
377                "debug",
378                "troubleshoot",
379                "resolve",
380                "repair",
381                "investigate",
382                "diagnose",
383                "create",
384                "implement",
385                "set up",
386                "setup",
387                "scaffold",
388                "write",
389                "draft",
390                "update",
391                "change",
392                "modify",
393                "refactor",
394                "migrate",
395                "compare",
396                "review",
397                "audit",
398                "analyze",
399                "analyse",
400                "evaluate",
401                "inspect",
402                "research",
403                "search",
404                "find",
405                "fetch",
406                "gather",
407                "collect",
408                "explore",
409                "plan",
410                "design",
411                "outline",
412                "document",
413                "summarize",
414                "summarise",
415                "propose",
416            ],
417        );
418
419        has_verification_signals && !has_non_verification_signals
420    }
421
422    fn infer_auto_tracked_request_shape(message: &str) -> AutoTrackedRequestShape {
423        let text = message.trim().to_ascii_lowercase();
424
425        if Self::message_contains_any(
426            &text,
427            &[
428                "fix",
429                "debug",
430                "troubleshoot",
431                "resolve",
432                "repair",
433                "investigate why",
434                "diagnose",
435            ],
436        ) {
437            return AutoTrackedRequestShape::InvestigateOrFix;
438        }
439
440        if Self::message_contains_any(
441            &text,
442            &[
443                "compare", "review", "audit", "assess", "analyze", "analyse", "evaluate", "inspect",
444            ],
445        ) {
446            return AutoTrackedRequestShape::AnalyzeOrCompare;
447        }
448
449        if Self::message_contains_any(
450            &text,
451            &[
452                "research", "find", "look up", "lookup", "search", "fetch", "gather", "collect",
453                "explore",
454            ],
455        ) {
456            return AutoTrackedRequestShape::ResearchOrFetch;
457        }
458
459        if Self::message_contains_any(
460            &text,
461            &[
462                "create",
463                "implement",
464                "build",
465                "set up",
466                "setup",
467                "scaffold",
468                "write",
469                "draft",
470                "compose",
471                "update",
472                "change",
473                "modify",
474                "refactor",
475                "migrate",
476            ],
477        ) {
478            return AutoTrackedRequestShape::CreateOrModify;
479        }
480
481        if Self::message_contains_any(
482            &text,
483            &[
484                "plan",
485                "design",
486                "outline",
487                "document",
488                "summarize",
489                "summarise",
490                "propose",
491            ],
492        ) {
493            return AutoTrackedRequestShape::PlanOrDraft;
494        }
495
496        AutoTrackedRequestShape::GeneralExecution
497    }
498
499    fn ensure_request_session_id(request: &mut AgentRequest) {
500        let missing_session_id = request
501            .metadata
502            .session_id
503            .as_deref()
504            .is_none_or(|session_id| session_id.trim().is_empty());
505
506        if !missing_session_id {
507            return;
508        }
509
510        let generated = format!("agent-run-{}", uuid::Uuid::new_v4());
511        tracing::warn!(
512            generated_session_id = %generated,
513            source = ?request.metadata.source,
514            "Agent request missing session_id; synthesizing unique session id for isolated workspace/task tracking"
515        );
516        request.metadata.session_id = Some(generated);
517    }
518
519    fn should_auto_track_request(message: &str, explicit_task_id: Option<&str>) -> bool {
520        if explicit_task_id.is_some() {
521            return false;
522        }
523
524        let text = message.trim().to_ascii_lowercase();
525        if text.is_empty() || text.starts_with('/') {
526            return false;
527        }
528
529        if Self::is_simple_verification_request(&text) {
530            return false;
531        }
532
533        Self::message_contains_any(
534            &text,
535            &[
536                "plan and implement",
537                "carefully plan",
538                "build and test",
539                "implement then",
540                "step by step",
541                "break this down",
542                "end to end",
543                "scaffold",
544                "refactor",
545                "fix",
546                "debug",
547                "build",
548                "test",
549                "compare",
550                "review",
551                "audit",
552                "analyze",
553                "analyse",
554                "assess",
555                "evaluate",
556                "inspect",
557                "research",
558                "search",
559                "find",
560                "fetch",
561                "gather",
562                "collect",
563                "explore",
564                "plan",
565                "design",
566                "outline",
567                "document",
568                "write",
569                "draft",
570                "summarize",
571                "summarise",
572                "propose",
573                "investigate",
574                "troubleshoot",
575            ],
576        ) && text.split_whitespace().count() >= 6
577    }
578
579    fn should_offer_task_tool_for_request(analysis: &crate::context::RequestAnalysis) -> bool {
580        analysis.needs_tools && Self::should_auto_track_request(&analysis.request, None)
581    }
582
583    fn reflection_enabled_for(&self, metadata: &RequestMetadata) -> bool {
584        metadata
585            .reflection_enabled
586            .unwrap_or(self.pipeline_config.reflection.enabled)
587    }
588
589    #[inline(always)]
590    async fn maybe_apply_advanced_primitives_middleware(
591        &self,
592        request: &mut AgentRequest,
593        analysis: &crate::context::RequestAnalysis,
594    ) {
595        let intent = requirement_detection_input(request).trim().to_string();
596        if !gestura_core_tasks::ADVANCED_PRIMITIVES_ENABLED
597            || is_internal_requirement_breakdown_request(request)
598            || !analysis.needs_tools
599            || !Self::should_auto_track_request(&intent, request.metadata.task_id.as_deref())
600        {
601            return;
602        }
603
604        let enhancement = gestura_core_tasks::AdvancedPrimitives::run_enhanced_plan(
605            gestura_core_tasks::AdvancedPlanRequest {
606                user_intent: intent.clone(),
607                base_system_prompt: request.system_prompt.clone().unwrap_or_else(|| {
608                    gestura_core_pipeline::persona::default_system_prompt(&request.metadata)
609                }),
610                session_id: request.metadata.session_id.clone(),
611                task_id: request.metadata.task_id.clone(),
612                source: format!("{:?}", request.metadata.source),
613                complex_intent: true,
614                requires_verification: Self::prompt_requires_build_and_test(&intent),
615                metadata_hints: request.metadata.hints.clone(),
616            },
617        )
618        .await;
619
620        if enhancement.applied {
621            request.system_prompt = Some(enhancement.system_prompt);
622            request.metadata.hints.extend(enhancement.metadata_hints);
623        }
624    }
625
626    /// Normalize a raw request into a unified [`gestura_core_intent::Intent`] and
627    /// attach the result as metadata hints.
628    ///
629    /// When `advanced-primitives` is disabled at compile time the
630    /// [`gestura_core_intent::INTENT_NORMALIZATION_ENABLED`] constant is `false`
631    /// and this entire branch constant-folds away, preserving the original
632    /// pipeline behavior.
633    #[inline(always)]
634    fn maybe_attach_normalized_intent(request: &mut AgentRequest) {
635        if !gestura_core_intent::INTENT_NORMALIZATION_ENABLED {
636            return;
637        }
638
639        let modality =
640            gestura_core_intent::InputModality::from_request_source(&request.metadata.source);
641        let raw_input = gestura_core_intent::RawInput {
642            text: request.input.clone(),
643            modality,
644            session_id: request.metadata.session_id.clone(),
645            gesture_data: None,
646        };
647        let intent = gestura_core_intent::normalize_input_to_intent(raw_input);
648
649        tracing::debug!(
650            intent_id = %intent.id,
651            modality = %intent.modality.label(),
652            action = %intent.primary_action,
653            confidence = intent.confidence,
654            "Normalized input to unified intent"
655        );
656
657        request
658            .metadata
659            .hints
660            .insert("intent.id".to_string(), intent.id.clone());
661        request.metadata.hints.insert(
662            "intent.primary_action".to_string(),
663            intent.primary_action.clone(),
664        );
665        request.metadata.hints.insert(
666            "intent.modality".to_string(),
667            intent.modality.label().to_string(),
668        );
669        request.metadata.hints.insert(
670            "intent.confidence".to_string(),
671            format!("{:.2}", intent.confidence),
672        );
673        if !intent.context_hints.is_empty() {
674            request.metadata.hints.insert(
675                "intent.context_hints".to_string(),
676                intent.context_hints.join(","),
677            );
678        }
679    }
680
681    fn append_task_tool_for_auto_tracked_request(
682        analysis: &crate::context::RequestAnalysis,
683        candidate_names: &HashSet<&str>,
684        tools: &mut Vec<&'static ToolDefinition>,
685    ) {
686        if !candidate_names.contains("task")
687            || !Self::should_offer_task_tool_for_request(analysis)
688            || tools.iter().any(|tool| tool.name == "task")
689        {
690            return;
691        }
692
693        if let Some(task_tool) = crate::tools::registry::find_tool("task") {
694            tools.push(task_tool);
695        }
696    }
697
698    fn derive_agent_request_task_name(message: &str) -> String {
699        let trimmed = message.trim();
700        if trimmed.is_empty() {
701            return "Agent request".to_string();
702        }
703
704        let sentence = trimmed
705            .split(['\n', '.', '!', '?'])
706            .next()
707            .unwrap_or(trimmed)
708            .trim();
709        let candidate =
710            sentence.trim_matches(|ch: char| ch.is_ascii_punctuation() || ch.is_whitespace());
711        if candidate.is_empty() {
712            "Agent request".to_string()
713        } else {
714            candidate.chars().take(96).collect()
715        }
716    }
717
718    /// Generate a typed requirement breakdown using the shared core pipeline.
719    pub async fn generate_requirement_breakdown_specs(
720        cfg: AppConfig,
721        source: RequestSource,
722        session_id: Option<&str>,
723        requirements: &str,
724    ) -> Result<Vec<crate::tasks::RequirementBreakdownTaskSpec>, String> {
725        let pipeline = AgentPipeline::with_provider_optimized_config(cfg);
726        let mut request = AgentRequest::new(Self::build_requirement_breakdown_prompt(requirements))
727            .with_streaming(false)
728            .with_source(source)
729            .with_tools_enabled(false);
730
731        if let Some(session_id) = session_id {
732            request = request.with_session(session_id);
733        }
734        request.metadata.hints.insert(
735            INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY.to_string(),
736            "true".to_string(),
737        );
738
739        let response = Box::pin(pipeline.process_blocking(request))
740            .await
741            .map_err(|error| format!("LLM error: {error}"))?;
742
743        crate::tasks::parse_requirement_breakdown_response(&response.content)
744            .map_err(|error| error.to_string())
745    }
746
747    /// Build the default tracked execution specs used when LLM planning is unavailable.
748    pub fn default_auto_tracked_execution_specs(
749        message: &str,
750    ) -> Vec<crate::tasks::RequirementBreakdownTaskSpec> {
751        Self::default_auto_tracked_execution_subtasks(message)
752            .into_iter()
753            .map(
754                |(name, description)| crate::tasks::RequirementBreakdownTaskSpec {
755                    name,
756                    description,
757                    priority: "high".to_string(),
758                    is_blocking: false,
759                    parent_name: None,
760                },
761            )
762            .collect()
763    }
764
765    fn build_requirement_breakdown_prompt(requirements: &str) -> String {
766        format!(
767            r#"You are a project planning assistant. Analyze the following requirements and break them down into a structured task list.
768
769Requirements:
770{}
771
772Please respond with a JSON array of tasks. Each task should have:
773- "name": A concise task name (max 60 chars)
774- "description": A detailed description of what needs to be done
775- "priority": "high", "medium", or "low"
776- "is_blocking": true if other tasks depend on this, false otherwise
777- "parent_name": null for root tasks, or the exact name of the parent task for subtasks
778
779Order tasks by priority and logical execution order. Group related tasks under parent tasks.
780
781Example format:
782[
783  {{"name": "Setup project structure", "description": "Initialize the project...", "priority": "high", "is_blocking": true, "parent_name": null}},
784  {{"name": "Configure build system", "description": "Set up the build...", "priority": "high", "is_blocking": false, "parent_name": "Setup project structure"}}
785]
786
787Respond ONLY with the JSON array, no additional text."#,
788            requirements
789        )
790    }
791
792    fn default_auto_tracked_execution_subtasks(message: &str) -> Vec<(String, String)> {
793        let request = message.trim();
794        let mentions_validation = Self::message_contains_any(
795            &request.to_ascii_lowercase(),
796            &[
797                "build",
798                "test",
799                "verify",
800                "validation",
801                "validate",
802                "check",
803                "run",
804                "compile",
805                "lint",
806                "smoke",
807            ],
808        );
809
810        match Self::infer_auto_tracked_request_shape(request) {
811            AutoTrackedRequestShape::CreateOrModify => vec![
812                (
813                    "Inspect the current state and constraints".to_string(),
814                    format!(
815                        "Inspect the current environment, relevant context, and constraints before acting on:\n\n{}",
816                        request
817                    ),
818                ),
819                (
820                    "Prepare the starting point or prerequisites".to_string(),
821                    format!(
822                        "Set up the starting point, prerequisites, or scaffolding needed to complete:\n\n{}",
823                        request
824                    ),
825                ),
826                (
827                    "Carry out the requested work".to_string(),
828                    format!(
829                        "Create, modify, draft, or otherwise perform the primary work needed for:\n\n{}",
830                        request
831                    ),
832                ),
833                (
834                    "Validate the result and summarize follow-up".to_string(),
835                    if mentions_validation {
836                        format!(
837                            "Run the appropriate verification steps, checks, or commands and summarize the final outcome for:\n\n{}",
838                            request
839                        )
840                    } else {
841                        format!(
842                            "Review the completed result, verify it in the most appropriate way, and summarize any follow-up for:\n\n{}",
843                            request
844                        )
845                    },
846                ),
847            ],
848            AutoTrackedRequestShape::InvestigateOrFix => vec![
849                (
850                    "Investigate the current issue or constraints".to_string(),
851                    format!(
852                        "Gather the evidence needed to understand the problem, failure, or constraints involved in:\n\n{}",
853                        request
854                    ),
855                ),
856                (
857                    "Apply the fix or adjustment".to_string(),
858                    format!(
859                        "Make the change, adjustment, or corrective action needed to address:\n\n{}",
860                        request
861                    ),
862                ),
863                (
864                    "Validate the fix and remaining risk".to_string(),
865                    format!(
866                        "Verify whether the issue is resolved and summarize any remaining risk or follow-up for:\n\n{}",
867                        request
868                    ),
869                ),
870            ],
871            AutoTrackedRequestShape::AnalyzeOrCompare => vec![
872                (
873                    "Inspect the relevant inputs and criteria".to_string(),
874                    format!(
875                        "Gather the materials, context, and comparison criteria needed to evaluate:\n\n{}",
876                        request
877                    ),
878                ),
879                (
880                    "Analyze the findings and identify gaps".to_string(),
881                    format!(
882                        "Analyze the relevant differences, patterns, tradeoffs, or gaps involved in:\n\n{}",
883                        request
884                    ),
885                ),
886                (
887                    "Summarize conclusions and recommended actions".to_string(),
888                    format!(
889                        "Deliver a clear summary of the conclusions, recommendations, or next steps for:\n\n{}",
890                        request
891                    ),
892                ),
893            ],
894            AutoTrackedRequestShape::ResearchOrFetch => vec![
895                (
896                    "Gather the relevant context and sources".to_string(),
897                    format!(
898                        "Collect the most relevant information, evidence, or source material needed for:\n\n{}",
899                        request
900                    ),
901                ),
902                (
903                    "Extract the information that matters".to_string(),
904                    format!(
905                        "Filter and organize the most useful details, signals, or facts for:\n\n{}",
906                        request
907                    ),
908                ),
909                (
910                    "Summarize findings and next steps".to_string(),
911                    format!(
912                        "Present the findings clearly and include any recommended next steps for:\n\n{}",
913                        request
914                    ),
915                ),
916            ],
917            AutoTrackedRequestShape::PlanOrDraft => vec![
918                (
919                    "Clarify the goal, audience, and constraints".to_string(),
920                    format!(
921                        "Identify the intent, audience, constraints, and success criteria behind:\n\n{}",
922                        request
923                    ),
924                ),
925                (
926                    "Draft the requested output".to_string(),
927                    format!(
928                        "Produce the requested plan, document, draft, or structured output for:\n\n{}",
929                        request
930                    ),
931                ),
932                (
933                    "Review and refine the result".to_string(),
934                    format!(
935                        "Review the draft for completeness, quality, and alignment with the request:\n\n{}",
936                        request
937                    ),
938                ),
939            ],
940            AutoTrackedRequestShape::GeneralExecution => vec![
941                (
942                    "Inspect the current state and gather context".to_string(),
943                    format!(
944                        "Inspect the current state, gather the needed context, and identify the concrete next steps for:\n\n{}",
945                        request
946                    ),
947                ),
948                (
949                    "Carry out the requested work".to_string(),
950                    format!(
951                        "Perform the primary work needed to complete:\n\n{}",
952                        request
953                    ),
954                ),
955                (
956                    "Verify the outcome and summarize next steps".to_string(),
957                    if mentions_validation {
958                        format!(
959                            "Run the appropriate checks or commands, verify the outcome, and summarize what remains for:\n\n{}",
960                            request
961                        )
962                    } else {
963                        format!(
964                            "Review the outcome, verify that it satisfies the request, and summarize next steps for:\n\n{}",
965                            request
966                        )
967                    },
968                ),
969            ],
970        }
971    }
972
973    fn build_auto_tracked_execution_handoff_message(
974        original_message: &str,
975        root_task_name: &str,
976        planned_subtasks: &[String],
977    ) -> String {
978        let mut handoff = String::new();
979        handoff.push_str(original_message.trim());
980        handoff.push_str("\n\n[Runtime execution handoff]\n");
981        handoff.push_str(&format!(
982            "A task plan already exists for this request under the tracked task \"{}\". Execute that plan now instead of creating a fresh plan from scratch.\n",
983            root_task_name
984        ));
985        if !planned_subtasks.is_empty() {
986            handoff.push_str("Planned tracked subtasks:\n");
987            for subtask in planned_subtasks {
988                handoff.push_str("- ");
989                handoff.push_str(subtask);
990                handoff.push('\n');
991            }
992        }
993        handoff.push_str(
994            "Update task statuses as you start and finish each concrete subtask. Keep research, planning, and other inspection-heavy subtasks `inprogress` until you are genuinely done gathering or synthesizing evidence for that phase; a single search, read, or fetch usually means the task has started, not finished. If you create new work, create a concrete subtask with a specific name. When that new work is follow-on execution discovered while finishing the current task, attach it to the tracked root plan rather than nesting it under the currently executing task unless it is a true blocking prerequisite. That keeps the current execution task completable once the handoff is recorded. Do not mark the tracked root task complete until every planned subtask is completed or explicitly cancelled for a real reason. Begin concrete work immediately. If the request is primarily analysis or research, gather evidence first and summarize the outcome clearly instead of forcing unnecessary edits or commands."
995        );
996        handoff
997    }
998
999    async fn maybe_initialize_tracked_request_task(
1000        &self,
1001        request: &mut AgentRequest,
1002        analysis_needs_tools: bool,
1003        task_tool_available: bool,
1004    ) {
1005        if request.metadata.task_id.is_some()
1006            || is_internal_requirement_breakdown_request(request)
1007            || !analysis_needs_tools
1008            || !task_tool_available
1009            || !Self::should_auto_track_request(&request.input, None)
1010        {
1011            return;
1012        }
1013
1014        Self::ensure_request_session_id(request);
1015        let Some(session_id) = request.metadata.session_id.as_deref() else {
1016            return;
1017        };
1018
1019        let manager = crate::get_global_task_manager();
1020        let original_input = request.input.trim().to_string();
1021        request
1022            .metadata
1023            .hints
1024            .entry(REQUIREMENT_DETECTION_INPUT_HINT_KEY.to_string())
1025            .or_insert_with(|| original_input.clone());
1026
1027        let task_name = Self::derive_agent_request_task_name(&original_input);
1028        let requirement_input = requirement_detection_input(request).to_string();
1029        let plan_specs = match Box::pin(Self::generate_requirement_breakdown_specs(
1030            self.config.clone(),
1031            request.metadata.source,
1032            Some(session_id),
1033            &original_input,
1034        ))
1035        .await
1036        {
1037            Ok(specs) => specs,
1038            Err(error) => {
1039                tracing::warn!(
1040                    session_id = %session_id,
1041                    error = %error,
1042                    "Failed to generate structured tracked plan; falling back to default tracked subtasks"
1043                );
1044                Self::default_auto_tracked_execution_specs(&original_input)
1045            }
1046        };
1047
1048        match manager.initialize_auto_tracked_execution_plan(
1049            session_id,
1050            &task_name,
1051            &original_input,
1052            &plan_specs,
1053        ) {
1054            Ok(plan) => {
1055                request.metadata.task_id = Some(plan.root_task.id.clone());
1056                request.input = Self::build_auto_tracked_execution_handoff_message(
1057                    &requirement_input,
1058                    &plan.root_task.name,
1059                    &plan.planned_subtasks,
1060                );
1061                tracing::info!(
1062                    session_id = %session_id,
1063                    task_id = %plan.root_task.id,
1064                    task_name = %plan.root_task.name,
1065                    planned_subtasks = plan.generated_task_count,
1066                    initial_task_id = ?plan.initial_task_id,
1067                    "Initialized tracked root task and shared execution plan for agent request"
1068                );
1069            }
1070            Err(error) => {
1071                tracing::warn!(
1072                    session_id = %session_id,
1073                    error = %error,
1074                    "Failed to initialize tracked root task for agent request"
1075                );
1076            }
1077        }
1078    }
1079
1080    /// Build a [`ContextManager`] pre-wired with the built-in tool registry.
1081    fn build_context_manager() -> ContextManager {
1082        ContextManager::new().with_tool_provider(Box::new(|| {
1083            crate::tools::registry::all_tools()
1084                .iter()
1085                .map(|t| (t.name.to_string(), t.description.to_string()))
1086                .collect()
1087        }))
1088    }
1089
1090    /// Create a new pipeline using the default runtime configuration merged with
1091    /// persisted user pipeline settings from [`AppConfig`].
1092    pub fn new(config: AppConfig) -> Self {
1093        let pipeline_config = PipelineConfig::default().with_user_settings(&config.pipeline);
1094        let arc_config = std::sync::Arc::new(config.clone());
1095        let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1096        Self {
1097            config,
1098            context_manager: Self::build_context_manager(),
1099            analyzer: RequestAnalyzer::new(),
1100            pipeline_config,
1101            permission_manager: PermissionManager::new(),
1102            knowledge_store: None,
1103            knowledge_settings: None,
1104            tool_router,
1105            capabilities_cache: ModelCapabilitiesCache::new(),
1106        }
1107    }
1108
1109    /// Create a pipeline with custom configuration
1110    pub fn with_config(config: AppConfig, pipeline_config: PipelineConfig) -> Self {
1111        let arc_config = std::sync::Arc::new(config.clone());
1112        let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1113        Self {
1114            config,
1115            context_manager: Self::build_context_manager(),
1116            analyzer: RequestAnalyzer::new(),
1117            pipeline_config,
1118            permission_manager: PermissionManager::new(),
1119            knowledge_store: None,
1120            knowledge_settings: None,
1121            tool_router,
1122            capabilities_cache: ModelCapabilitiesCache::new(),
1123        }
1124    }
1125
1126    /// Set the knowledge store and settings manager for this pipeline
1127    pub fn with_knowledge(
1128        mut self,
1129        store: &'static KnowledgeStore,
1130        settings: &'static KnowledgeSettingsManager,
1131    ) -> Self {
1132        self.knowledge_store = Some(store);
1133        self.knowledge_settings = Some(settings);
1134        self
1135    }
1136
1137    /// Create a HookEngine from the current configuration.
1138    ///
1139    /// Returns `None` if hooks are disabled or empty.
1140    fn create_hook_engine(&self) -> Option<HookEngine> {
1141        if !self.config.hooks.enabled || self.config.hooks.hooks.is_empty() {
1142            return None;
1143        }
1144        Some(HookEngine::new(self.config.hooks.clone()))
1145    }
1146
1147    /// Run a hook event, logging any failures but not propagating them.
1148    ///
1149    /// This is used for best-effort hooks (PostPipeline, PostTool) where failures
1150    /// should not affect the main flow.
1151    async fn run_hook_best_effort(&self, engine: &HookEngine, event: HookEvent, ctx: &HookContext) {
1152        match engine.run(event, ctx).await {
1153            Ok(records) => {
1154                for record in &records {
1155                    tracing::debug!(
1156                        hook = %record.name,
1157                        event = ?record.event,
1158                        exit_code = record.output.exit_code,
1159                        "Hook executed (best-effort)"
1160                    );
1161                }
1162            }
1163            Err(e) => {
1164                tracing::warn!(
1165                    event = ?event,
1166                    error = %e,
1167                    "Hook execution failed (best-effort, continuing)"
1168                );
1169            }
1170        }
1171    }
1172
1173    /// Ensure all enabled MCP servers from the application config are connected
1174    /// in the global client registry. Already-connected servers are skipped.
1175    async fn ensure_mcp_servers_connected(&self) {
1176        let registry = crate::mcp::get_mcp_client_registry();
1177        let connected = registry.connected_servers().await;
1178
1179        for entry in &self.config.mcp_servers {
1180            if !entry.enabled || connected.contains(&entry.name) {
1181                continue;
1182            }
1183            match registry.connect(entry).await {
1184                Ok(tools) => {
1185                    tracing::info!(
1186                        server = %entry.name,
1187                        tool_count = tools.len(),
1188                        "MCP server connected"
1189                    );
1190                }
1191                Err(e) => {
1192                    tracing::warn!(
1193                        server = %entry.name,
1194                        error = %e,
1195                        "Failed to connect MCP server (skipping)"
1196                    );
1197                }
1198            }
1199        }
1200    }
1201
1202    /// Streaming-friendly MCP preflight.
1203    ///
1204    /// This emits periodic `StreamChunk::Status` updates while connecting so the
1205    /// GUI never hits its "no events" idle timeout during slow/hung MCP servers.
1206    async fn ensure_mcp_servers_connected_streaming(
1207        &self,
1208        tx: &mpsc::Sender<StreamChunk>,
1209        cancel_token: &CancellationToken,
1210    ) {
1211        use tokio::time::{Duration, MissedTickBehavior};
1212
1213        let registry = crate::mcp::get_mcp_client_registry();
1214        let connected = registry.connected_servers().await;
1215
1216        for entry in &self.config.mcp_servers {
1217            if cancel_token.is_cancelled() {
1218                return;
1219            }
1220            if !entry.enabled || connected.contains(&entry.name) {
1221                continue;
1222            }
1223
1224            let base_msg = format!("Connecting to MCP server '{}'…", entry.name);
1225            let _ = tx
1226                .send(StreamChunk::Status {
1227                    message: base_msg.clone(),
1228                })
1229                .await;
1230
1231            // Keepalive status while we await connect(). This prevents the GUI's
1232            // 90s "no events" timeout even when a server has a long timeout.
1233            let mut tick = tokio::time::interval(Duration::from_secs(20));
1234            tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
1235
1236            let connect_fut = registry.connect(entry);
1237            tokio::pin!(connect_fut);
1238
1239            let result = loop {
1240                if cancel_token.is_cancelled() {
1241                    return;
1242                }
1243
1244                tokio::select! {
1245                    res = &mut connect_fut => break res,
1246                    _ = tick.tick() => {
1247                        let _ = tx.send(StreamChunk::Status { message: base_msg.clone() }).await;
1248                    }
1249                }
1250            };
1251
1252            match result {
1253                Ok(tools) => {
1254                    tracing::info!(
1255                        server = %entry.name,
1256                        tool_count = tools.len(),
1257                        "MCP server connected"
1258                    );
1259                    let _ = tx
1260                        .send(StreamChunk::Status {
1261                            message: format!(
1262                                "MCP server '{}' connected ({})",
1263                                entry.name,
1264                                tools.len()
1265                            ),
1266                        })
1267                        .await;
1268                }
1269                Err(e) => {
1270                    tracing::warn!(
1271                        server = %entry.name,
1272                        error = %e,
1273                        "Failed to connect MCP server (skipping)"
1274                    );
1275                    let _ = tx
1276                        .send(StreamChunk::Status {
1277                            message: format!("MCP server '{}' unavailable (skipping)", entry.name),
1278                        })
1279                        .await;
1280                }
1281            }
1282        }
1283    }
1284
1285    /// Create a checkpoint before a write tool execution.
1286    ///
1287    /// This is a best-effort operation: failures are logged but do not block tool execution.
1288    fn try_create_checkpoint_before_tool(&self, session_id: &str, tool_name: &str) {
1289        let label = format!("before:{}", tool_name);
1290
1291        // Use default stores - these are lightweight to construct
1292        let session_store = FileAgentSessionStore::new_default();
1293        let checkpoint_store = FileCheckpointStore::new_default();
1294        let manager =
1295            CheckpointManager::new(checkpoint_store, CheckpointRetentionPolicy::default());
1296
1297        // TaskManager needs a base directory
1298        let task_manager = TaskManager::new(AppConfig::data_dir());
1299
1300        match manager.create_session_checkpoint(
1301            session_id,
1302            &session_store,
1303            &task_manager,
1304            &self.config,
1305            Some(label),
1306        ) {
1307            Ok(meta) => {
1308                tracing::info!(
1309                    checkpoint_id = %meta.id,
1310                    session_id = session_id,
1311                    tool = tool_name,
1312                    "Created auto-checkpoint before write tool"
1313                );
1314            }
1315            Err(e) => {
1316                tracing::warn!(
1317                    session_id = session_id,
1318                    tool = tool_name,
1319                    error = %e,
1320                    "Failed to create auto-checkpoint (continuing with tool execution)"
1321                );
1322            }
1323        }
1324    }
1325
1326    /// Create a pipeline with configuration optimized for the current LLM provider
1327    ///
1328    /// This automatically sets the context token limit based on the provider's capabilities
1329    /// and applies user settings from AppConfig.pipeline.
1330    ///
1331    /// **Note:** For model-specific limits, prefer [`with_model_optimized_config`].
1332    pub fn with_provider_optimized_config(config: AppConfig) -> Self {
1333        let provider = config.llm.primary.as_str();
1334        let model_id = Self::extract_model_id(&config, provider);
1335        let capabilities_cache = ModelCapabilitiesCache::new();
1336
1337        // Use model-specific capabilities when we have a model ID
1338        let pipeline_config = if let Some(model) = model_id {
1339            PipelineConfig::for_model_with_cache(provider, model, &capabilities_cache)
1340                .with_user_settings(&config.pipeline)
1341        } else {
1342            PipelineConfig::for_provider(provider).with_user_settings(&config.pipeline)
1343        };
1344
1345        tracing::info!(
1346            provider = provider,
1347            model = model_id.unwrap_or("unknown"),
1348            max_context_tokens = pipeline_config.max_context_tokens,
1349            max_history_messages = pipeline_config.max_history_messages,
1350            auto_compact_threshold = pipeline_config.auto_compact_threshold,
1351            compaction_strategy = ?pipeline_config.compaction_strategy,
1352            "Created pipeline with model-optimized configuration and user settings"
1353        );
1354
1355        let arc_config = std::sync::Arc::new(config.clone());
1356        let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1357        Self {
1358            config,
1359            context_manager: Self::build_context_manager(),
1360            analyzer: RequestAnalyzer::new(),
1361            pipeline_config,
1362            permission_manager: PermissionManager::new(),
1363            knowledge_store: None,
1364            knowledge_settings: None,
1365            tool_router,
1366            capabilities_cache,
1367        }
1368    }
1369
1370    /// Create a pipeline with a shared capabilities cache for dynamic limit discovery.
1371    ///
1372    /// This allows the pipeline to learn model limits from errors and API discovery,
1373    /// sharing that knowledge across pipeline instances.
1374    pub fn with_shared_capabilities_cache(
1375        config: AppConfig,
1376        capabilities_cache: ModelCapabilitiesCache,
1377    ) -> Self {
1378        let provider = config.llm.primary.as_str();
1379        let model_id = Self::extract_model_id(&config, provider);
1380
1381        let pipeline_config = if let Some(model) = model_id {
1382            PipelineConfig::for_model_with_cache(provider, model, &capabilities_cache)
1383                .with_user_settings(&config.pipeline)
1384        } else {
1385            PipelineConfig::for_provider(provider).with_user_settings(&config.pipeline)
1386        };
1387
1388        tracing::info!(
1389            provider = provider,
1390            model = model_id.unwrap_or("unknown"),
1391            max_context_tokens = pipeline_config.max_context_tokens,
1392            "Created pipeline with shared capabilities cache"
1393        );
1394
1395        let arc_config = std::sync::Arc::new(config.clone());
1396        let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1397        Self {
1398            config,
1399            context_manager: Self::build_context_manager(),
1400            analyzer: RequestAnalyzer::new(),
1401            pipeline_config,
1402            permission_manager: PermissionManager::new(),
1403            knowledge_store: None,
1404            knowledge_settings: None,
1405            tool_router,
1406            capabilities_cache,
1407        }
1408    }
1409
1410    /// Extract the model ID from config for the given provider.
1411    fn extract_model_id<'a>(config: &'a AppConfig, provider: &str) -> Option<&'a str> {
1412        let model = match provider {
1413            "openai" => config.llm.openai.as_ref().map(|c| c.model.as_str()),
1414            "anthropic" => config.llm.anthropic.as_ref().map(|c| c.model.as_str()),
1415            "grok" => config.llm.grok.as_ref().map(|c| c.model.as_str()),
1416            "gemini" => config.llm.gemini.as_ref().map(|c| c.model.as_str()),
1417            "ollama" => config.llm.ollama.as_ref().map(|c| c.model.as_str()),
1418            _ => None,
1419        };
1420        tracing::debug!(
1421            provider = provider,
1422            model = ?model,
1423            has_openai_config = config.llm.openai.is_some(),
1424            "[extract_model_id] Extracted model ID from config"
1425        );
1426        model
1427    }
1428
1429    /// Get a reference to the capabilities cache for learning model limits.
1430    pub fn capabilities_cache(&self) -> &ModelCapabilitiesCache {
1431        &self.capabilities_cache
1432    }
1433
1434    fn effective_request_max_iterations(&self, request: &AgentRequest) -> Option<usize> {
1435        if let Some(override_limit) = request.max_iterations {
1436            return Some(override_limit);
1437        }
1438
1439        if !self.pipeline_config.iteration_budget_enabled {
1440            return None;
1441        }
1442
1443        if request.metadata.task_id.is_some() {
1444            Some(self.pipeline_config.tracked_task_max_iterations.max(1))
1445        } else {
1446            Some(self.pipeline_config.max_iterations.max(1))
1447        }
1448    }
1449
1450    /// Process a request with streaming response
1451    ///
1452    /// This is the main entry point for streaming LLM interactions.
1453    /// It handles context reduction, tool filtering, and the agentic loop.
1454    pub async fn process_streaming(
1455        &self,
1456        request: AgentRequest,
1457        tx: mpsc::Sender<StreamChunk>,
1458        cancel_token: CancellationToken,
1459    ) -> Result<AgentResponse, AppError> {
1460        // 0. If resuming from a paused state, reconstruct the request with the
1461        //    full conversational context so the model continues from where it
1462        //    left off.
1463        let request = if let Some(paused) = request.resume_from.clone() {
1464            tracing::info!(
1465                iteration = paused.iteration,
1466                partial_len = paused.partial_content.len(),
1467                tool_calls = paused.completed_tool_calls.len(),
1468                "Resuming from paused execution state"
1469            );
1470
1471            let _ = tx
1472                .send(StreamChunk::Status {
1473                    message: "Resuming paused session…".to_string(),
1474                })
1475                .await;
1476
1477            // Build resumed history:
1478            // 1. Start with the history that existed before the paused request.
1479            let mut resumed_history = paused.history.clone();
1480            // 2. Re-add the original user message.
1481            resumed_history.push(Message::user(&paused.original_input));
1482            // 3. Append partial assistant content (if any) so the model sees it.
1483            if !paused.partial_content.is_empty() {
1484                resumed_history.push(Message::assistant(&paused.partial_content));
1485            }
1486            // 4. Append tool call results so the model has full tool context.
1487            for tc in &paused.completed_tool_calls {
1488                let output = match &tc.result {
1489                    ToolResult::Success(s) => s.clone(),
1490                    ToolResult::Error(e) => format!("Error: {e}"),
1491                    ToolResult::Skipped(msg) => format!("Skipped: {msg}"),
1492                };
1493                resumed_history.push(Message::tool_result(&tc.id, &output));
1494            }
1495
1496            let resume_input = if paused.has_content() {
1497                "Please continue from where you left off. \
1498                 Your previous response was interrupted."
1499                    .to_string()
1500            } else {
1501                paused.original_input.clone()
1502            };
1503
1504            let mut resumed = AgentRequest::new(resume_input)
1505                .with_streaming(request.streaming)
1506                .with_history(resumed_history);
1507            resumed.max_iterations = request.max_iterations;
1508            resumed.metadata = request.metadata.clone();
1509            if let Some(sp) = paused.system_prompt {
1510                resumed = resumed.with_system_prompt(sp);
1511            } else if let Some(sp) = request.system_prompt.clone() {
1512                resumed = resumed.with_system_prompt(sp);
1513            }
1514            resumed
1515        } else {
1516            request
1517        };
1518
1519        // G1+G5: Auto-detect workspace_dir from the process working directory when
1520        // the caller did not supply one.  This ensures guardrails (AGENTS.md) and
1521        // the memory bank are always available in a standard project checkout.
1522        let mut request = request;
1523        if request.metadata.workspace_dir.is_none()
1524            && let Ok(cwd) = std::env::current_dir()
1525        {
1526            tracing::debug!(
1527                cwd = %cwd.display(),
1528                "workspace_dir not set; defaulting to CWD"
1529            );
1530            request.metadata.workspace_dir = Some(cwd);
1531        }
1532        Self::ensure_request_session_id(&mut request);
1533        Self::ensure_request_session_id(&mut request);
1534
1535        let telemetry = AgentRequestTelemetry::start(
1536            &request,
1537            RequestRunMode::Streaming,
1538            self.config.pipeline.agent_telemetry.enabled,
1539        )
1540        .await;
1541        let result = telemetry.in_request_scope(async {
1542
1543        // 1. Analyze the request
1544        let mut analysis = tracing::info_span!("agent.pipeline.analyze_request").in_scope(|| {
1545            let mut analysis = self.analyzer.analyze(&request.input);
1546
1547            // Heuristic: if the user is replying with an approval ("ok", "please proceed")
1548            // and the previous assistant turn proposed using a tool, promote this turn into
1549            // a tool-enabled follow-up so the agent can actually execute the intended tool.
1550            self.promote_approval_to_tool_followup(&request, &mut analysis);
1551            analysis
1552        });
1553        // Emit analysis telemetry after local request-shaping heuristics have run
1554        // so the trace reflects the final analyzer state for this request.
1555        telemetry.record_analysis(&analysis).await;
1556        tracing::debug!(
1557            "Request analysis: categories={:?}, needs_tools={}, confidence={}",
1558            analysis.categories,
1559            analysis.needs_tools,
1560            analysis.confidence
1561        );
1562        self.maybe_apply_advanced_primitives_middleware(&mut request, &analysis).await;
1563        Self::maybe_attach_normalized_intent(&mut request);
1564
1565        // 1b. Pre-flight LLM tool routing (only when strategy != Keyword).
1566        // The router merges its selection into analysis.suggested_tools, which
1567        // get_tools_for_analysis() checks before the category map.
1568        if let Some(router) = &self.tool_router
1569            && analysis.needs_tools
1570        {
1571            let all: Vec<&'static ToolDefinition> = all_tools().iter().collect();
1572            let routing = router
1573                .route(&request.input, &all, analysis.confidence)
1574                .instrument(tracing::info_span!("agent.pipeline.route_tools"))
1575                .await;
1576            if routing.has_selection() {
1577                tracing::debug!(
1578                    tools = ?routing.suggested_tools,
1579                    "Pre-flight LLM router selected tools (streaming)"
1580                );
1581                analysis.suggested_tools = routing.suggested_tools;
1582            }
1583        }
1584
1585        // 2. Filter tools based on categories (and allowed_tools if specified)
1586        let tools_enabled_for_request = request.metadata.tools_enabled.unwrap_or(true);
1587
1588        let relevant_tools = if self.pipeline_config.enable_tools
1589            && tools_enabled_for_request
1590            && analysis.needs_tools
1591        {
1592            self.get_tools_for_analysis(&analysis, &request.metadata.allowed_tools)
1593        } else {
1594            Vec::new()
1595        };
1596
1597        // Decide whether MCP tool *schemas* should be included for this request.
1598        //
1599        // We only want to touch MCP (connect, enumerate tools) when it is actually
1600        // relevant/allowed. Otherwise, slow/hung MCP servers can starve the streaming
1601        // UI of events and trip the GUI's idle timeout.
1602        let include_mcp_tool_schemas = self.pipeline_config.enable_tools
1603            && tools_enabled_for_request
1604            && analysis.needs_tools
1605            && (
1606                // The built-in MCP tool is part of the filtered tool set.
1607                relevant_tools.iter().any(|t| t.name == "mcp")
1608                    // Or the request explicitly whitelists MCP tools by name.
1609                    || request
1610                        .metadata
1611                        .allowed_tools
1612                        .iter()
1613                        .any(|t| t == "mcp" || t.starts_with("mcp__"))
1614            );
1615        self.maybe_initialize_tracked_request_task(
1616            &mut request,
1617            analysis.needs_tools,
1618            relevant_tools.iter().any(|tool| tool.name == "task"),
1619        )
1620        .await;
1621        // Record the final filtered tool set that will shape prompt/tool schema construction.
1622        telemetry
1623            .record_tool_selection(
1624                relevant_tools.len(),
1625                include_mcp_tool_schemas,
1626                analysis.needs_tools && relevant_tools.is_empty(),
1627            )
1628            .await;
1629        tracing::debug!(
1630            "Relevant tools: {:?}",
1631            relevant_tools.iter().map(|t| t.name).collect::<Vec<_>>()
1632        );
1633
1634        // Workspace sandboxing (used by tool execution)
1635        let workspace = request.metadata.workspace_dir.as_ref().and_then(|p| {
1636            SessionWorkspace::from_directory(
1637                request.metadata.session_id.as_deref().unwrap_or("unknown"),
1638                p.clone(),
1639            )
1640            .ok()
1641        });
1642
1643        // Fast-path: if the user is explicitly approving a previously proposed tool call
1644        // (e.g. "okay please proceed"), execute the intended tool directly from history.
1645        //
1646        // This prevents a common UX failure mode where the model describes tool usage,
1647        // the user approves, but the provider doesn't emit a structured tool call so the
1648        // app appears to "hang" or never produces an answer.
1649        if self.pipeline_config.enable_tools
1650            && tools_enabled_for_request
1651            && analysis.is_followup
1652            && Self::looks_like_approval(&request.input)
1653            && let Some(resp) = self
1654                .try_execute_confirmed_tool_from_history(
1655                    &request,
1656                    &analysis,
1657                    &relevant_tools,
1658                    workspace.as_ref(),
1659                    &tx,
1660                    &cancel_token,
1661                )
1662                .await?
1663        {
1664            return Ok(resp);
1665        }
1666
1667        // 2b. If MCP tools are relevant for this request, pre-connect MCP servers with
1668        // streaming keepalive status so we never silently block before the first LLM chunk.
1669        if include_mcp_tool_schemas {
1670            self.ensure_mcp_servers_connected_streaming(&tx, &cancel_token)
1671                .instrument(tracing::info_span!("agent.pipeline.connect_mcp"))
1672                .await;
1673        }
1674
1675        // 3. Resolve context
1676        let mut resolved_context = tracing::info_span!(
1677            "agent.pipeline.resolve_context",
1678            phase = "initial"
1679        )
1680        .in_scope(|| {
1681            self.context_manager.resolve_context(
1682                &request.input,
1683                &analysis,
1684                &request.history,
1685                request.metadata.workspace_dir.as_deref(),
1686            )
1687        });
1688
1689        // 3.1+3.2. Enrich context with memory bank and enabled knowledge items.
1690        self.enrich_resolved_context(
1691            &mut resolved_context,
1692            &request,
1693            request.metadata.workspace_dir.as_deref(),
1694            &request.input,
1695            &request.metadata,
1696        )
1697        .instrument(tracing::info_span!(
1698            "agent.pipeline.enrich_context",
1699            phase = "initial"
1700        ))
1701        .await;
1702        // Capture the enriched context before compaction/prompt construction.
1703        telemetry
1704            .record_context_resolved("initial", &resolved_context)
1705            .await;
1706
1707        // 3.5. Check for auto-compaction before building prompt
1708        // Build a preview prompt to estimate tokens
1709        let preview_prompt = tracing::info_span!("agent.pipeline.build_preview_prompt")
1710            .in_scope(|| self.build_prompt(&request, &resolved_context));
1711        if let Some(compaction_chunk) = self
1712            .check_and_apply_auto_compaction(&request.history, &preview_prompt, &request.metadata)
1713            .instrument(tracing::info_span!("agent.pipeline.auto_compaction"))
1714            .await
1715        {
1716            telemetry.record_compaction(&compaction_chunk).await;
1717            // Emit user-visible status **before** the compaction result chunk.
1718            let message = self.build_auto_compaction_status_message(&preview_prompt);
1719            let _ = tx.send(StreamChunk::Status { message }).await;
1720
1721            // Emit compaction notification to user
1722            let _ = tx.send(compaction_chunk).await;
1723
1724            // Re-resolve context after compaction and re-enrich (G4: enrichment
1725            // must run again so memory bank + knowledge survive compaction).
1726            resolved_context = tracing::info_span!(
1727                "agent.pipeline.resolve_context",
1728                phase = "post_compaction"
1729            )
1730            .in_scope(|| {
1731                self.context_manager.resolve_context(
1732                    &request.input,
1733                    &analysis,
1734                    &request.history,
1735                    request.metadata.workspace_dir.as_deref(),
1736                )
1737            });
1738            self.enrich_resolved_context(
1739                &mut resolved_context,
1740                &request,
1741                request.metadata.workspace_dir.as_deref(),
1742                &request.input,
1743                &request.metadata,
1744            )
1745            .instrument(tracing::info_span!(
1746                "agent.pipeline.enrich_context",
1747                phase = "post_compaction"
1748            ))
1749            .await;
1750            telemetry
1751                .record_context_resolved("post_compaction", &resolved_context)
1752                .await;
1753        }
1754
1755        // 4. Build the optimized prompt with token limit checking
1756        let (prompt, truncated) = tracing::info_span!("agent.pipeline.prepare_prompt")
1757            .in_scope(|| self.truncate_prompt_if_needed(&request, &mut resolved_context));
1758        telemetry
1759            .record_prompt_prepared(&prompt, truncated, &resolved_context)
1760            .await;
1761
1762        if truncated {
1763            tracing::info!("Prompt was truncated to fit token limit");
1764        }
1765
1766        // 4.5. Hard validation: reject if still over limit after truncation
1767        // This prevents API errors and provides clear feedback to the user
1768        self.validate_token_limit(&prompt)?;
1769
1770        // 4.6. Emit token usage update for user visibility
1771        let token_usage_chunk = self.create_token_usage_update(&prompt);
1772        send_token_usage_chunk_best_effort(&tx, token_usage_chunk).await;
1773
1774        // 4.7. Run PrePipeline hooks (if enabled)
1775        let hook_engine = self.create_hook_engine();
1776        if let Some(ref engine) = hook_engine {
1777            let hook_ctx = HookContext {
1778                workspace_dir: request.metadata.workspace_dir.clone(),
1779                session_id: request.metadata.session_id.clone(),
1780                pipeline_prompt: Some(prompt.clone()),
1781                ..Default::default()
1782            };
1783            if let Err(e) = engine.run(HookEvent::PrePipeline, &hook_ctx).await {
1784                tracing::warn!(error = %e, "PrePipeline hook failed (continuing)");
1785            }
1786        }
1787
1788        // 5. Execute the agentic loop with workspace sandboxing.
1789        // 6. If experiential reflection is enabled, evaluate the completed turn,
1790        //    optionally generate a structured reflection, attempt one bounded
1791        //    corrective retry (text revision or safe re-execution), and
1792        //    consolidate the result into memory before running PostPipeline hooks.
1793        let reflection_tx = tx.clone();
1794        let reflection_cancel_token = cancel_token.clone();
1795        let reflection_retry_prompt = prompt.clone();
1796        let reflection_retry_tools = relevant_tools.clone();
1797        let reflection_retry_context = resolved_context.clone();
1798        let effective_max_iterations = self.effective_request_max_iterations(&request);
1799        let reflection_quality_budget = effective_max_iterations.unwrap_or(0);
1800        let relevant_tool_count = relevant_tools.len();
1801        let requirement_detection_input = requirement_detection_input(&request);
1802        let requires_build_and_test = Self::prompt_requires_build_and_test(requirement_detection_input);
1803        let requires_mutating_file_tool_success =
1804            Self::request_requires_mutating_file_tool_success(requirement_detection_input);
1805        if requires_build_and_test {
1806            tracing::warn!(
1807                request_input_preview = %requirement_detection_input.chars().take(160).collect::<String>(),
1808                source = ?request.metadata.source,
1809                session_id = ?request.metadata.session_id,
1810                task_id = ?request.metadata.task_id,
1811                "Agent request seeded requires_build_and_test=true from requirement-detection input"
1812            );
1813        }
1814
1815        // Execute with context overflow recovery: if we get a ContextOverflow error,
1816        // learn the actual model limit, force compaction, and retry once.
1817        let mut response = match self
1818            .execute_agentic_loop_streaming(
1819                prompt.clone(),
1820                requires_build_and_test,
1821                requires_mutating_file_tool_success,
1822                relevant_tools.clone(),
1823                include_mcp_tool_schemas,
1824                resolved_context.clone(),
1825                tx.clone(),
1826                cancel_token.clone(),
1827                workspace.as_ref(),
1828                request.metadata.session_id.clone(),
1829                request.metadata.task_id.clone(),
1830                effective_max_iterations,
1831                request.metadata.permission_level,
1832                &telemetry,
1833            )
1834            .instrument(tracing::info_span!(
1835                "agent.pipeline.execute_agent_loop",
1836                mode = "streaming",
1837                tools = relevant_tool_count,
1838                max_iterations = ?effective_max_iterations
1839            ))
1840            .await
1841        {
1842            Ok(resp) => resp,
1843            Err(AppError::ContextOverflow(ref error_msg)) => {
1844                // Learn the actual limit from the error message and persist it in
1845                // the cache for future requests.  We keep the result here so we
1846                // can derive a correct prompt budget for this retry attempt.
1847                let provider = self.config.llm.primary.as_str();
1848                let model_id = Self::extract_model_id(&self.config, provider)
1849                    .unwrap_or("unknown");
1850
1851                let learned_caps = self.capabilities_cache.learn_from_error(
1852                    provider,
1853                    model_id,
1854                    error_msg,
1855                );
1856
1857                if let Some(ref caps) = learned_caps {
1858                    tracing::info!(
1859                        provider = provider,
1860                        model = model_id,
1861                        learned_context_length = caps.context_length,
1862                        learned_max_input_tokens = caps.max_input_tokens(),
1863                        "Learned model context limit from overflow error; \
1864                         will re-budget retry prompt to avoid immediate re-overflow"
1865                    );
1866                } else {
1867                    tracing::warn!(
1868                        provider = provider,
1869                        model = model_id,
1870                        error_msg = %error_msg,
1871                        "Could not parse context limit from overflow error; \
1872                         retry prompt will use the configured pipeline limits"
1873                    );
1874                }
1875
1876                // Compute the effective prompt budget for the retry.
1877                //
1878                // Using the learned limit (when available) prevents the retry from
1879                // building a prompt that still exceeds the model's *actual* context
1880                // window.  Before this fix, `truncate_prompt_if_needed` was called
1881                // here, which always reads from `self.pipeline_config` — a stale
1882                // value that may be larger than what the model really accepts.  If
1883                // the overflow happened because our configured limit was too high,
1884                // the retry would immediately overflow again.
1885                let retry_max_input_tokens = learned_caps
1886                    .as_ref()
1887                    .map(|c| c.max_input_tokens())
1888                    .unwrap_or_else(|| {
1889                        self.pipeline_config
1890                            .max_context_tokens
1891                            .saturating_sub(self.pipeline_config.max_output_tokens)
1892                    });
1893
1894                // Notify user about recovery attempt
1895                let _ = tx.send(StreamChunk::Status {
1896                    message: "Context overflow detected. Compacting conversation history and retrying..."
1897                        .to_string(),
1898                }).await;
1899
1900                // Force aggressive compaction on the history
1901                let compacted_history = self.force_context_compaction(
1902                    &request.history,
1903                    &request.metadata,
1904                ).await;
1905
1906                // Re-resolve context with compacted history
1907                let mut compacted_request = request.clone();
1908                compacted_request.history = compacted_history;
1909
1910                let compacted_analysis = self.analyzer.analyze(&compacted_request.input);
1911                let mut compacted_context = self.context_manager.resolve_context(
1912                    &compacted_request.input,
1913                    &compacted_analysis,
1914                    &compacted_request.history,
1915                    compacted_request.metadata.workspace_dir.as_deref(),
1916                );
1917                self.enrich_resolved_context(
1918                    &mut compacted_context,
1919                    &compacted_request,
1920                    compacted_request.metadata.workspace_dir.as_deref(),
1921                    &compacted_request.input,
1922                    &compacted_request.metadata,
1923                ).await;
1924
1925                // Rebuild prompt using the learned limit so the retry prompt is
1926                // guaranteed to fit within the model's real context window.
1927                let (compacted_prompt, _) = self.truncate_prompt_with_budget(
1928                    &compacted_request,
1929                    &mut compacted_context,
1930                    retry_max_input_tokens,
1931                );
1932
1933                // Retry with compacted context
1934                tracing::info!(
1935                    retry_max_input_tokens = retry_max_input_tokens,
1936                    learned_from_error = learned_caps.is_some(),
1937                    original_history_len = request.history.len(),
1938                    compacted_history_len = compacted_request.history.len(),
1939                    "Retrying agent loop with compacted context and re-budgeted prompt"
1940                );
1941
1942                self.execute_agentic_loop_streaming(
1943                    compacted_prompt,
1944                    requires_build_and_test,
1945                    requires_mutating_file_tool_success,
1946                    relevant_tools,
1947                    include_mcp_tool_schemas,
1948                    compacted_context,
1949                    tx,
1950                    cancel_token,
1951                    workspace.as_ref(),
1952                    compacted_request.metadata.session_id.clone(),
1953                    compacted_request.metadata.task_id.clone(),
1954                    effective_max_iterations,
1955                    compacted_request.metadata.permission_level,
1956                    &telemetry,
1957                )
1958                .instrument(tracing::info_span!(
1959                    "agent.pipeline.execute_agent_loop",
1960                    mode = "streaming_retry_after_compaction",
1961                    tools = relevant_tool_count,
1962                ))
1963                .await?
1964            }
1965            Err(e) => return Err(e),
1966        };
1967
1968        response.truncated = truncated;
1969
1970        if reflection_cancel_token.is_cancelled() {
1971            telemetry.mark_outcome(RequestOutcome::Cancelled);
1972            return Ok(response);
1973        }
1974
1975        if let Some(mut generated_reflection) = self
1976            .maybe_generate_reflection(
1977                &request.input,
1978                &response,
1979                &request.metadata,
1980                Some(&reflection_tx),
1981                &reflection_cancel_token,
1982                reflection_quality_budget,
1983            )
1984            .instrument(tracing::info_span!("agent.pipeline.reflection", mode = "streaming"))
1985            .await
1986        {
1987            telemetry
1988                .record_reflection_generated(
1989                    generated_reflection.initial_quality_score,
1990                    generated_reflection.reflection.promotion_confidence(),
1991                )
1992                .await;
1993            let retry = if self.should_attempt_reflection_reexecution(
1994                &response,
1995                &generated_reflection.reflection,
1996                !reflection_retry_tools.is_empty(),
1997            ) {
1998                let _ = reflection_tx
1999                    .send(StreamChunk::Status {
2000                        message: "Low-confidence answer detected; running one safe reflection-guided retry with sandboxed tool access...".to_string(),
2001                    })
2002                    .await;
2003                self.maybe_run_reflection_reexecution(
2004                    &response,
2005                    &generated_reflection.reflection,
2006                    generated_reflection.initial_quality_score,
2007                    reflection::ReflectionReexecutionContext {
2008                        base_prompt: &reflection_retry_prompt,
2009                        tools: reflection_retry_tools.clone(),
2010                        context: reflection_retry_context.clone(),
2011                        session_id: request.metadata.session_id.as_deref(),
2012                        workspace: workspace.as_ref(),
2013                    },
2014                )
2015                .await
2016            } else {
2017                self.maybe_run_reflection_retry(
2018                    &request.input,
2019                    &response,
2020                    &generated_reflection.reflection,
2021                    generated_reflection.initial_quality_score,
2022                    Some(&reflection_tx),
2023                )
2024                .await
2025            };
2026
2027            if let Some(retry) = retry.as_ref() {
2028                telemetry
2029                    .record_reflection_retry(
2030                        retry.improved,
2031                        retry.improvement_score,
2032                        retry.iterations,
2033                    )
2034                    .await;
2035                if let Some(usage) = retry.usage.as_ref() {
2036                    merge_token_usage(&mut response.usage, usage);
2037                }
2038                generated_reflection.reflection.improvement_score = Some(retry.improvement_score);
2039                response.tool_calls.extend(retry.tool_calls.clone());
2040                response.iterations += retry.iterations;
2041
2042                if retry.improved {
2043                    let status_message = match retry.mode {
2044                        reflection::ReflectionRetryMode::TextRevision => format!(
2045                            "Reflection-guided revision improved quality from {:.0}% to {:.0}%",
2046                            generated_reflection.initial_quality_score * 100.0,
2047                            retry.retry_quality_score * 100.0,
2048                        ),
2049                        reflection::ReflectionRetryMode::CorrectiveReexecution => format!(
2050                            "Reflection-guided safe retry improved quality from {:.0}% to {:.0}% using {} tool calls across {} iterations",
2051                            generated_reflection.initial_quality_score * 100.0,
2052                            retry.retry_quality_score * 100.0,
2053                            retry.tool_calls.len(),
2054                            retry.iterations,
2055                        ),
2056                    };
2057                    let _ = reflection_tx
2058                        .send(StreamChunk::Status {
2059                            message: status_message,
2060                        })
2061                        .await;
2062
2063                    let revised_block = format!(
2064                        "{}{}",
2065                        reflection::REFLECTION_RETRY_SEPARATOR,
2066                        retry.revised_content
2067                    );
2068                    response.content.push_str(&revised_block);
2069                    self.emit_reflection_retry_text(&reflection_tx, &revised_block)
2070                        .await;
2071                } else {
2072                    let status_message = match retry.mode {
2073                        reflection::ReflectionRetryMode::TextRevision => format!(
2074                            "Reflection-guided revision did not materially improve quality ({:.0}% → {:.0}%)",
2075                            generated_reflection.initial_quality_score * 100.0,
2076                            retry.retry_quality_score * 100.0,
2077                        ),
2078                        reflection::ReflectionRetryMode::CorrectiveReexecution => format!(
2079                            "Reflection-guided safe retry did not materially improve quality ({:.0}% → {:.0}%, {} tool calls)",
2080                            generated_reflection.initial_quality_score * 100.0,
2081                            retry.retry_quality_score * 100.0,
2082                            retry.tool_calls.len(),
2083                        ),
2084                    };
2085                    let _ = reflection_tx
2086                        .send(StreamChunk::Status {
2087                            message: status_message,
2088                        })
2089                        .await;
2090                }
2091            }
2092
2093            self.finalize_reflection(
2094                &generated_reflection.reflection,
2095                &request.metadata,
2096                workspace.as_ref(),
2097                Some(&reflection_tx),
2098                retry.as_ref(),
2099            )
2100            .await;
2101        }
2102
2103        // 7. Run PostPipeline hooks (best-effort) after the reflection phase.
2104        if let Some(ref engine) = hook_engine {
2105            let hook_ctx = HookContext {
2106                workspace_dir: request.metadata.workspace_dir.clone(),
2107                session_id: request.metadata.session_id.clone(),
2108                ..Default::default()
2109            };
2110            self.run_hook_best_effort(engine, HookEvent::PostPipeline, &hook_ctx)
2111                .await;
2112        }
2113
2114        if !reflection_cancel_token.is_cancelled() {
2115            let _ = reflection_tx
2116                .send(StreamChunk::Done(response.usage.clone()))
2117                .await;
2118        }
2119
2120        Ok(response)
2121        })
2122        .await;
2123
2124        match &result {
2125            Ok(response) => telemetry.finish(Some(response), None).await,
2126            Err(error) => {
2127                telemetry.mark_outcome(RequestOutcome::Failed);
2128                telemetry.finish(None, Some(error)).await;
2129            }
2130        }
2131
2132        result
2133    }
2134
2135    fn looks_like_approval(input: &str) -> bool {
2136        let s = input.trim().to_lowercase();
2137        matches!(
2138            s.as_str(),
2139            "ok" | "okay"
2140                | "ok."
2141                | "okay."
2142                | "yes"
2143                | "y"
2144                | "sure"
2145                | "please proceed"
2146                | "proceed"
2147                | "go ahead"
2148                | "do it"
2149                | "run it"
2150                | "continue"
2151        ) || s.contains("please proceed")
2152            || s.contains("go ahead")
2153            || s.contains("please do")
2154            || s.contains("yes, proceed")
2155    }
2156
2157    /// Attempt to execute a previously proposed tool call directly from the assistant's
2158    /// last message, when the current user turn is an approval/follow-up.
2159    ///
2160    /// This is a defensive fallback for a common failure mode where the model:
2161    /// 1) proposes a tool call,
2162    /// 2) asks for confirmation,
2163    /// 3) after user approval, fails to emit a structured tool call.
2164    ///
2165    /// We infer the intended tool from the previous assistant message and execute it.
2166    async fn try_execute_confirmed_tool_from_history(
2167        &self,
2168        request: &AgentRequest,
2169        _analysis: &crate::context::RequestAnalysis,
2170        relevant_tools: &[&'static ToolDefinition],
2171        workspace: Option<&SessionWorkspace>,
2172        tx: &mpsc::Sender<StreamChunk>,
2173        cancel_token: &CancellationToken,
2174    ) -> Result<Option<AgentResponse>, AppError> {
2175        let has_tool = |name: &str| relevant_tools.iter().any(|t| t.name == name);
2176
2177        let Some(prev_assistant) = request.history.iter().rev().find(|m| m.role == "assistant")
2178        else {
2179            return Ok(None);
2180        };
2181
2182        let Some((tool_name, args, _answer_prefix)) =
2183            Self::extract_planned_tool_call_from_text(&prev_assistant.content)
2184        else {
2185            return Ok(None);
2186        };
2187
2188        // Only run if the tool is actually available on this turn.
2189        if !has_tool(&tool_name) {
2190            return Ok(None);
2191        }
2192
2193        // Execute immediately (still subject to the normal safety checks inside execute_tool).
2194        let tool_call_id = format!("confirmed_{tool_name}");
2195
2196        let _ = tx
2197            .send(StreamChunk::Thinking(
2198                "Executing approved command...\n".to_string(),
2199            ))
2200            .await;
2201        let _ = tx
2202            .send(StreamChunk::ToolCallStart {
2203                id: tool_call_id.clone(),
2204                name: tool_name.clone(),
2205            })
2206            .await;
2207        let _ = tx.send(StreamChunk::ToolCallArgs(args.clone())).await;
2208
2209        let start_time = Instant::now();
2210        let result = self
2211            .execute_tool(
2212                &tool_name,
2213                &args,
2214                workspace,
2215                request.metadata.session_id.as_deref(),
2216                Some(tx),
2217            )
2218            .await;
2219        let duration_ms = start_time.elapsed().as_millis() as u64;
2220
2221        let _ = tx.send(StreamChunk::ToolCallEnd).await;
2222
2223        // Emit structured tool result for frontend display
2224        let (success, output) = match &result {
2225            ToolResult::Success(out) => (true, out.trim_end().to_string()),
2226            ToolResult::Error(e) => (false, e.clone()),
2227            ToolResult::Skipped(msg) => (false, format!("Skipped: {}", msg)),
2228        };
2229        let _ = tx
2230            .send(StreamChunk::ToolCallResult {
2231                name: tool_name.clone(),
2232                success,
2233                output: output.clone(),
2234                duration_ms,
2235            })
2236            .await;
2237
2238        let record = ToolCallRecord {
2239            id: tool_call_id,
2240            name: tool_name,
2241            arguments: args,
2242            result,
2243            duration_ms,
2244        };
2245
2246        // Build a continuation prompt so the LLM can synthesize the tool output
2247        // into a helpful response for the user, instead of leaving raw tool output
2248        // as the final answer.
2249        let base_prompt = self.build_prompt(request, &crate::context::ResolvedContext::default());
2250        let continuation_prompt = self.build_tool_continuation_prompt(
2251            &base_prompt,
2252            "Executing the approved tool call.",
2253            std::slice::from_ref(&record),
2254        );
2255
2256        // Stream one more LLM call for synthesis (no tool schemas — text only).
2257        let (inner_tx, mut inner_rx) = mpsc::channel::<StreamChunk>(STREAM_CHUNK_BUFFER_CAPACITY);
2258        let streaming_cfg = crate::streaming::streaming_config_from(&self.config);
2259        let enable_fallback = self.pipeline_config.enable_fallback;
2260        let inner_cancel = cancel_token.clone();
2261
2262        let stream_handle = tokio::spawn(
2263            async move {
2264                if enable_fallback {
2265                    let _ = start_streaming_with_fallback(
2266                        &streaming_cfg,
2267                        &continuation_prompt,
2268                        None,
2269                        inner_tx,
2270                        inner_cancel,
2271                    )
2272                    .await;
2273                } else {
2274                    let _ = start_streaming(
2275                        &streaming_cfg,
2276                        &continuation_prompt,
2277                        None,
2278                        inner_tx,
2279                        inner_cancel,
2280                    )
2281                    .await;
2282                }
2283            }
2284            .instrument(tracing::Span::current()),
2285        );
2286
2287        let mut synthesis_text = String::new();
2288        let mut synthesis_usage = None;
2289
2290        while let Some(chunk) = inner_rx.recv().await {
2291            match &chunk {
2292                StreamChunk::Text(text) => {
2293                    synthesis_text.push_str(text);
2294                    let _ = tx.send(chunk).await;
2295                }
2296                StreamChunk::Thinking(_) => {
2297                    let _ = tx.send(chunk).await;
2298                }
2299                StreamChunk::Status { .. } => {
2300                    send_status_chunk_best_effort(tx, chunk).await;
2301                }
2302                StreamChunk::TokenUsageUpdate { .. } => {
2303                    send_token_usage_chunk_best_effort(tx, chunk).await;
2304                }
2305                StreamChunk::Done(usage) => {
2306                    synthesis_usage = usage.clone();
2307                    break;
2308                }
2309                StreamChunk::Error(_) | StreamChunk::Cancelled | StreamChunk::Paused => {
2310                    let _ = tx.send(chunk).await;
2311                    break;
2312                }
2313                _ => {
2314                    // Forward status or other informational chunks
2315                    let _ = tx.send(chunk).await;
2316                }
2317            }
2318        }
2319
2320        let _ = stream_handle.await;
2321        let _ = tx.send(StreamChunk::Done(synthesis_usage.clone())).await;
2322
2323        Ok(Some(AgentResponse {
2324            content: synthesis_text,
2325            thinking: None,
2326            tool_calls: vec![record],
2327            usage: synthesis_usage,
2328            context_used: crate::context::ResolvedContext::default(),
2329            truncated: false,
2330            iterations: 1,
2331        }))
2332    }
2333
2334    fn extract_shell_command_from_plan(text: &str) -> Option<String> {
2335        // Try common patterns first: run '...'/"..."/`...`
2336        if let Some(cmd) = Self::extract_quoted_after_keyword(text, "run") {
2337            return Some(cmd);
2338        }
2339        if let Some(cmd) = Self::extract_quoted_after_keyword(text, "execute") {
2340            return Some(cmd);
2341        }
2342
2343        // Fallback: try to grab the first token after "run".
2344        let lower = text.to_lowercase();
2345        let idx = lower.find("run ")?;
2346        let after = text[idx + 4..].trim_start();
2347        let token: String = after
2348            .chars()
2349            .take_while(|c| !c.is_whitespace() && !matches!(*c, '.' | ',' | ';' | ')' | '('))
2350            .collect();
2351        if token.is_empty() { None } else { Some(token) }
2352    }
2353
2354    fn extract_quoted_after_keyword(text: &str, keyword: &str) -> Option<String> {
2355        let lower = text.to_lowercase();
2356        let key = format!("{} ", keyword.to_lowercase());
2357        let start = lower.find(&key)?;
2358        let mut rest = text[start + key.len()..].trim_start();
2359        let quote = rest.chars().next()?;
2360        if quote != '\'' && quote != '"' && quote != '`' {
2361            return None;
2362        }
2363        rest = &rest[quote.len_utf8()..];
2364        let end = rest.find(quote)?;
2365        let cmd = rest[..end].trim().to_string();
2366        if cmd.is_empty() { None } else { Some(cmd) }
2367    }
2368
2369    fn extract_first_quoted(text: &str) -> Option<String> {
2370        for quote in ['\'', '"', '`'] {
2371            if let Some(start) = text.find(quote) {
2372                let rest = &text[start + quote.len_utf8()..];
2373                if let Some(end) = rest.find(quote) {
2374                    let s = rest[..end].trim();
2375                    if !s.is_empty() {
2376                        return Some(s.to_string());
2377                    }
2378                }
2379            }
2380        }
2381        None
2382    }
2383
2384    fn extract_first_url(text: &str) -> Option<String> {
2385        let lower = text.to_lowercase();
2386        let idx = lower.find("https://").or_else(|| lower.find("http://"))?;
2387        let after = &text[idx..];
2388        let url: String = after
2389            .chars()
2390            .take_while(|c| {
2391                !c.is_whitespace() && !matches!(*c, ')' | '(' | ']' | '[' | '"' | '\'' | '`' | ',')
2392            })
2393            .collect();
2394        if url.is_empty() { None } else { Some(url) }
2395    }
2396
2397    /// Infer a planned tool call from a prior assistant message.
2398    ///
2399    /// Returns: (tool_name, args_json, answer_prefix)
2400    fn extract_planned_tool_call_from_text(text: &str) -> Option<(String, String, String)> {
2401        let lower = text.to_lowercase();
2402
2403        // Shell
2404        if lower.contains("shell tool")
2405            || lower.contains("tool: shell")
2406            || lower.contains("use the shell")
2407            || lower.contains("run 'pwd'")
2408            || lower.contains("`pwd`")
2409        {
2410            let command = Self::extract_shell_command_from_plan(text)?;
2411            let args = serde_json::json!({"command": command}).to_string();
2412            return Some((
2413                "shell".to_string(),
2414                args,
2415                "Workspace directory root: ".to_string(),
2416            ));
2417        }
2418
2419        // File read
2420        if lower.contains("file tool")
2421            || lower.contains("read file")
2422            || lower.contains("read the file")
2423        {
2424            let path = Self::extract_quoted_after_keyword(text, "read")
2425                .or_else(|| Self::extract_first_quoted(text))?;
2426            let args = serde_json::json!({"operation": "read", "path": path}).to_string();
2427            return Some(("file".to_string(), args, "File contents: \n".to_string()));
2428        }
2429
2430        // Git status
2431        if lower.contains("git tool") || lower.contains("git status") {
2432            let args = serde_json::json!({"operation": "status"}).to_string();
2433            return Some(("git".to_string(), args, "Git status:\n".to_string()));
2434        }
2435
2436        // Web
2437        if lower.contains("web tool")
2438            || lower.contains("search the web")
2439            || lower.contains("web_search")
2440        {
2441            if lower.contains("fetch") || lower.contains("download") {
2442                let url = Self::extract_first_url(text)?;
2443                let args = serde_json::json!({"operation": "fetch", "url": url}).to_string();
2444                return Some(("web".to_string(), args, "Web fetch result:\n".to_string()));
2445            }
2446
2447            let query = Self::extract_quoted_after_keyword(text, "search")
2448                .or_else(|| Self::extract_first_quoted(text))?;
2449            let args = serde_json::json!({"operation": "search", "query": query}).to_string();
2450            return Some(("web".to_string(), args, "Web search results:\n".to_string()));
2451        }
2452
2453        // Code stats
2454        if lower.contains("code tool") || lower.contains("code stats") {
2455            let path = Self::extract_first_quoted(text).unwrap_or_else(|| ".".to_string());
2456            let args = serde_json::json!({"operation": "stats", "path": path}).to_string();
2457            return Some(("code".to_string(), args, "Code stats:\n".to_string()));
2458        }
2459
2460        None
2461    }
2462
2463    /// Process a request without streaming (blocking)
2464    pub async fn process_blocking(&self, request: AgentRequest) -> Result<AgentResponse, AppError> {
2465        // G1+G5: Auto-detect workspace_dir from the process working directory when
2466        // the caller did not supply one.  This ensures guardrails (AGENTS.md) and
2467        // the memory bank are always available in a standard project checkout.
2468        let mut request = request;
2469        if request.metadata.workspace_dir.is_none()
2470            && let Ok(cwd) = std::env::current_dir()
2471        {
2472            tracing::debug!(
2473                cwd = %cwd.display(),
2474                "workspace_dir not set; defaulting to CWD (blocking path)"
2475            );
2476            request.metadata.workspace_dir = Some(cwd);
2477        }
2478
2479        let telemetry = AgentRequestTelemetry::start(
2480            &request,
2481            RequestRunMode::Blocking,
2482            self.config.pipeline.agent_telemetry.enabled,
2483        )
2484        .await;
2485        let result = telemetry.in_request_scope(async {
2486
2487        // 1. Analyze the request
2488        let mut analysis = tracing::info_span!("agent.pipeline.analyze_request").in_scope(|| {
2489            self.analyzer.analyze(&request.input)
2490        });
2491        telemetry.record_analysis(&analysis).await;
2492        self.maybe_apply_advanced_primitives_middleware(&mut request, &analysis).await;
2493        Self::maybe_attach_normalized_intent(&mut request);
2494
2495        // 1b. Pre-flight LLM tool routing (only when strategy != Keyword).
2496        if let Some(router) = &self.tool_router
2497            && analysis.needs_tools
2498        {
2499            let all: Vec<&'static ToolDefinition> = all_tools().iter().collect();
2500            let routing = router
2501                .route(&request.input, &all, analysis.confidence)
2502                .instrument(tracing::info_span!("agent.pipeline.route_tools"))
2503                .await;
2504            if routing.has_selection() {
2505                tracing::debug!(
2506                    tools = ?routing.suggested_tools,
2507                    "Pre-flight LLM router selected tools (blocking)"
2508                );
2509                analysis.suggested_tools = routing.suggested_tools;
2510            }
2511        }
2512
2513        // 2. Filter tools (and allowed_tools if specified)
2514        let tools_enabled_for_request = request.metadata.tools_enabled.unwrap_or(true);
2515
2516        let relevant_tools = if self.pipeline_config.enable_tools
2517            && tools_enabled_for_request
2518            && analysis.needs_tools
2519        {
2520            self.get_tools_for_analysis(&analysis, &request.metadata.allowed_tools)
2521        } else {
2522            Vec::new()
2523        };
2524
2525        let include_mcp_tool_schemas = self.pipeline_config.enable_tools
2526            && tools_enabled_for_request
2527            && analysis.needs_tools
2528            && (relevant_tools.iter().any(|t| t.name == "mcp")
2529                || request
2530                    .metadata
2531                    .allowed_tools
2532                    .iter()
2533                    .any(|t| t == "mcp" || t.starts_with("mcp__")));
2534        self.maybe_initialize_tracked_request_task(
2535            &mut request,
2536            analysis.needs_tools,
2537            relevant_tools.iter().any(|tool| tool.name == "task"),
2538        )
2539        .await;
2540        telemetry
2541            .record_tool_selection(
2542                relevant_tools.len(),
2543                include_mcp_tool_schemas,
2544                analysis.needs_tools && relevant_tools.is_empty(),
2545            )
2546            .await;
2547
2548        // Blocking mode: connect MCP servers only when MCP is relevant/allowed.
2549        if include_mcp_tool_schemas {
2550            self.ensure_mcp_servers_connected()
2551                .instrument(tracing::info_span!("agent.pipeline.connect_mcp"))
2552                .await;
2553        }
2554
2555        // 3. Resolve context
2556        let mut resolved_context = tracing::info_span!(
2557            "agent.pipeline.resolve_context",
2558            phase = "initial"
2559        )
2560        .in_scope(|| {
2561            self.context_manager.resolve_context(
2562                &request.input,
2563                &analysis,
2564                &request.history,
2565                request.metadata.workspace_dir.as_deref(),
2566            )
2567        });
2568
2569        // 3.1+3.2. Enrich context with memory bank and enabled knowledge items.
2570        self.enrich_resolved_context(
2571            &mut resolved_context,
2572            &request,
2573            request.metadata.workspace_dir.as_deref(),
2574            &request.input,
2575            &request.metadata,
2576        )
2577        .instrument(tracing::info_span!(
2578            "agent.pipeline.enrich_context",
2579            phase = "initial"
2580        ))
2581        .await;
2582        telemetry
2583            .record_context_resolved("initial", &resolved_context)
2584            .await;
2585
2586        // 3.5. Check for auto-compaction before building prompt
2587        // Build a preview prompt to estimate tokens
2588        let preview_prompt = tracing::info_span!("agent.pipeline.build_preview_prompt")
2589            .in_scope(|| self.build_prompt(&request, &resolved_context));
2590        if let Some(compaction_chunk) = self
2591            .check_and_apply_auto_compaction(&request.history, &preview_prompt, &request.metadata)
2592            .instrument(tracing::info_span!("agent.pipeline.auto_compaction"))
2593            .await
2594        {
2595            telemetry.record_compaction(&compaction_chunk).await;
2596            // Log compaction in blocking mode (no stream to emit to)
2597            match compaction_chunk {
2598                StreamChunk::ContextCompacted {
2599                    messages_before,
2600                    messages_after,
2601                    tokens_saved,
2602                    summary,
2603                } => {
2604                    tracing::info!(
2605                        messages_before = messages_before,
2606                        messages_after = messages_after,
2607                        tokens_saved = tokens_saved,
2608                        "Context auto-compacted in blocking mode: {}",
2609                        summary
2610                    );
2611                }
2612                StreamChunk::MemoryBankSaved {
2613                    file_path,
2614                    session_id,
2615                    summary,
2616                    messages_saved,
2617                } => {
2618                    tracing::info!(
2619                        file_path = %file_path,
2620                        session_id = %session_id,
2621                        messages_saved = messages_saved,
2622                        "Memory bank saved in blocking mode: {}",
2623                        summary
2624                    );
2625                }
2626                _ => {}
2627            }
2628
2629            // Re-resolve context after compaction and re-enrich (G4: enrichment
2630            // must run again so memory bank + knowledge survive compaction).
2631            resolved_context = tracing::info_span!(
2632                "agent.pipeline.resolve_context",
2633                phase = "post_compaction"
2634            )
2635            .in_scope(|| {
2636                self.context_manager.resolve_context(
2637                    &request.input,
2638                    &analysis,
2639                    &request.history,
2640                    request.metadata.workspace_dir.as_deref(),
2641                )
2642            });
2643            self.enrich_resolved_context(
2644                &mut resolved_context,
2645                &request,
2646                request.metadata.workspace_dir.as_deref(),
2647                &request.input,
2648                &request.metadata,
2649            )
2650            .instrument(tracing::info_span!(
2651                "agent.pipeline.enrich_context",
2652                phase = "post_compaction"
2653            ))
2654            .await;
2655            telemetry
2656                .record_context_resolved("post_compaction", &resolved_context)
2657                .await;
2658        }
2659
2660        // 4. Build prompt with token limit checking
2661        let (prompt, truncated) = tracing::info_span!("agent.pipeline.prepare_prompt")
2662            .in_scope(|| self.truncate_prompt_if_needed(&request, &mut resolved_context));
2663        telemetry
2664            .record_prompt_prepared(&prompt, truncated, &resolved_context)
2665            .await;
2666
2667        if truncated {
2668            tracing::info!("Prompt was truncated to fit token limit");
2669        }
2670
2671        // 4.5. Hard validation: reject if still over limit after truncation
2672        // This prevents API errors and provides clear feedback to the user
2673        self.validate_token_limit(&prompt)?;
2674
2675        // 4.6. Log token usage in blocking mode
2676        if let StreamChunk::TokenUsageUpdate {
2677            estimated,
2678            limit,
2679            percentage,
2680            status,
2681            estimated_cost,
2682        } = self.create_token_usage_update(&prompt)
2683        {
2684            let status_str = match status {
2685                crate::streaming::TokenUsageStatus::Green => "🟢 Green",
2686                crate::streaming::TokenUsageStatus::Yellow => "🟡 Yellow",
2687                crate::streaming::TokenUsageStatus::Red => "🔴 Red",
2688            };
2689            tracing::info!(
2690                estimated_tokens = estimated,
2691                limit = limit,
2692                percentage = percentage,
2693                status = status_str,
2694                estimated_cost_usd = format!("${:.4}", estimated_cost),
2695                "Token usage in blocking mode: {} tokens / {} tokens ({}%) - Est. cost: ${:.4}",
2696                estimated,
2697                limit,
2698                percentage,
2699                estimated_cost
2700            );
2701        }
2702
2703        // 4.7. Run PrePipeline hooks (if enabled)
2704        let hook_engine = self.create_hook_engine();
2705        if let Some(ref engine) = hook_engine {
2706            let hook_ctx = HookContext {
2707                workspace_dir: request.metadata.workspace_dir.clone(),
2708                session_id: request.metadata.session_id.clone(),
2709                pipeline_prompt: Some(prompt.clone()),
2710                ..Default::default()
2711            };
2712            if let Err(e) = engine.run(HookEvent::PrePipeline, &hook_ctx).await {
2713                tracing::warn!(error = %e, "PrePipeline hook failed in blocking mode (continuing)");
2714            }
2715        }
2716
2717        // 5. Execute blocking agentic loop with workspace sandboxing
2718        let workspace = request.metadata.workspace_dir.as_ref().and_then(|p| {
2719            SessionWorkspace::from_directory(
2720                request.metadata.session_id.as_deref().unwrap_or("unknown"),
2721                p.clone(),
2722            )
2723            .ok()
2724        });
2725        let reflection_retry_prompt = prompt.clone();
2726        let reflection_retry_tools = relevant_tools.clone();
2727        let reflection_retry_context = resolved_context.clone();
2728        let effective_max_iterations = self.effective_request_max_iterations(&request);
2729        let reflection_quality_budget = effective_max_iterations.unwrap_or(0);
2730        let relevant_tool_count = relevant_tools.len();
2731        let requirement_detection_input = requirement_detection_input(&request);
2732        let requires_build_and_test = Self::prompt_requires_build_and_test(requirement_detection_input);
2733        let requires_mutating_file_tool_success =
2734            Self::request_requires_mutating_file_tool_success(requirement_detection_input);
2735        if requires_build_and_test {
2736            tracing::warn!(
2737                request_input_preview = %requirement_detection_input.chars().take(160).collect::<String>(),
2738                source = ?request.metadata.source,
2739                session_id = ?request.metadata.session_id,
2740                task_id = ?request.metadata.task_id,
2741                "Agent request seeded requires_build_and_test=true from requirement-detection input"
2742            );
2743        }
2744
2745        let mut response = self
2746            .execute_agentic_loop_blocking(
2747                prompt,
2748                requires_build_and_test,
2749                requires_mutating_file_tool_success,
2750                relevant_tools,
2751                include_mcp_tool_schemas,
2752                resolved_context,
2753                workspace.as_ref(),
2754                request.metadata.session_id.clone(),
2755                request.metadata.task_id.clone(),
2756                effective_max_iterations,
2757                &telemetry,
2758            )
2759            .instrument(tracing::info_span!(
2760                "agent.pipeline.execute_agent_loop",
2761                mode = "blocking",
2762                tools = relevant_tool_count,
2763                max_iterations = ?effective_max_iterations
2764            ))
2765            .await?;
2766
2767        response.truncated = truncated;
2768
2769        if let Some(mut generated_reflection) = self
2770            .maybe_generate_reflection(
2771                &request.input,
2772                &response,
2773                &request.metadata,
2774                None,
2775                &crate::streaming::CancellationToken::new(),
2776                reflection_quality_budget,
2777            )
2778            .instrument(tracing::info_span!("agent.pipeline.reflection", mode = "blocking"))
2779            .await
2780        {
2781            telemetry
2782                .record_reflection_generated(
2783                    generated_reflection.initial_quality_score,
2784                    generated_reflection.reflection.promotion_confidence(),
2785                )
2786                .await;
2787            let retry = if self.should_attempt_reflection_reexecution(
2788                &response,
2789                &generated_reflection.reflection,
2790                !reflection_retry_tools.is_empty(),
2791            ) {
2792                self.maybe_run_reflection_reexecution(
2793                    &response,
2794                    &generated_reflection.reflection,
2795                    generated_reflection.initial_quality_score,
2796                    reflection::ReflectionReexecutionContext {
2797                        base_prompt: &reflection_retry_prompt,
2798                        tools: reflection_retry_tools,
2799                        context: reflection_retry_context,
2800                        session_id: request.metadata.session_id.as_deref(),
2801                        workspace: workspace.as_ref(),
2802                    },
2803                )
2804                .await
2805            } else {
2806                self.maybe_run_reflection_retry(
2807                    &request.input,
2808                    &response,
2809                    &generated_reflection.reflection,
2810                    generated_reflection.initial_quality_score,
2811                    None,
2812                )
2813                .await
2814            };
2815
2816            if let Some(retry) = retry.as_ref() {
2817                telemetry
2818                    .record_reflection_retry(
2819                        retry.improved,
2820                        retry.improvement_score,
2821                        retry.iterations,
2822                    )
2823                    .await;
2824                if let Some(usage) = retry.usage.as_ref() {
2825                    merge_token_usage(&mut response.usage, usage);
2826                }
2827                generated_reflection.reflection.improvement_score = Some(retry.improvement_score);
2828                response.tool_calls.extend(retry.tool_calls.clone());
2829                response.iterations += retry.iterations;
2830
2831                if retry.improved {
2832                    response.content = retry.revised_content.clone();
2833                    response.thinking = None;
2834                }
2835            }
2836
2837            self.finalize_reflection(
2838                &generated_reflection.reflection,
2839                &request.metadata,
2840                workspace.as_ref(),
2841                None,
2842                retry.as_ref(),
2843            )
2844            .await;
2845        }
2846
2847        // 5.1. Run PostPipeline hooks (best-effort)
2848        if let Some(ref engine) = hook_engine {
2849            let hook_ctx = HookContext {
2850                workspace_dir: request.metadata.workspace_dir.clone(),
2851                session_id: request.metadata.session_id.clone(),
2852                ..Default::default()
2853            };
2854            self.run_hook_best_effort(engine, HookEvent::PostPipeline, &hook_ctx)
2855                .await;
2856        }
2857
2858        Ok(response)
2859        })
2860        .await;
2861
2862        match &result {
2863            Ok(response) => telemetry.finish(Some(response), None).await,
2864            Err(error) => {
2865                telemetry.mark_outcome(RequestOutcome::Failed);
2866                telemetry.finish(None, Some(error)).await;
2867            }
2868        }
2869
2870        result
2871    }
2872
2873    /// Get tools relevant to the analyzed request
2874    /// If allowed_tools is non-empty, treat them as the candidate pool and still
2875    /// narrow to the tools that are relevant for the analyzed request.
2876    fn get_tools_for_analysis(
2877        &self,
2878        analysis: &crate::context::RequestAnalysis,
2879        allowed_tools: &[String],
2880    ) -> Vec<&'static ToolDefinition> {
2881        use crate::context::ContextCategory;
2882
2883        let candidate_tools: Vec<_> = if allowed_tools.is_empty() {
2884            all_tools().iter().collect()
2885        } else {
2886            allowed_tools
2887                .iter()
2888                .filter_map(|tool_name| crate::tools::registry::find_tool(tool_name))
2889                .collect()
2890        };
2891        let candidate_names: HashSet<_> = candidate_tools.iter().map(|tool| tool.name).collect();
2892        let mut tools = Vec::new();
2893
2894        if !allowed_tools.is_empty() && self.pipeline_config.log_token_usage {
2895            tracing::debug!(
2896                allowed_tools = ?allowed_tools,
2897                candidate_tools = ?candidate_tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2898                "Using session-specific tool configuration as candidate pool"
2899            );
2900        }
2901
2902        // If the pre-flight LLM router already chose tools, use that selection
2903        // directly (skipping the category map entirely for this request), but keep
2904        // the selection inside the candidate pool.
2905        if !analysis.suggested_tools.is_empty() {
2906            let mut resolved: Vec<_> = analysis
2907                .suggested_tools
2908                .iter()
2909                .filter(|name| candidate_names.contains(name.as_str()))
2910                .filter_map(|name| crate::tools::registry::find_tool(name))
2911                .collect();
2912
2913            Self::append_task_tool_for_auto_tracked_request(
2914                analysis,
2915                &candidate_names,
2916                &mut resolved,
2917            );
2918
2919            if !resolved.is_empty() {
2920                resolved.sort_by_key(|tool| tool.name);
2921                resolved.dedup_by_key(|tool| tool.name);
2922                if self.pipeline_config.log_token_usage {
2923                    tracing::debug!(
2924                        suggested = ?analysis.suggested_tools,
2925                        resolved_tools = ?resolved.iter().map(|t| t.name).collect::<Vec<_>>(),
2926                        "Using LLM router tool selection"
2927                    );
2928                }
2929                return resolved;
2930            }
2931        }
2932
2933        let mut push_if_allowed = |tool_name: &str| {
2934            if candidate_names.contains(tool_name)
2935                && let Some(tool) = crate::tools::registry::find_tool(tool_name)
2936            {
2937                tools.push(tool);
2938            }
2939        };
2940
2941        // Otherwise, filter the candidate pool by category.
2942        for category in &analysis.categories {
2943            match category {
2944                ContextCategory::FileSystem => push_if_allowed("file"),
2945                ContextCategory::Shell => push_if_allowed("shell"),
2946                ContextCategory::Git => push_if_allowed("git"),
2947                ContextCategory::Code => {
2948                    for tool_name in crate::tools::registry::code_tool_names() {
2949                        push_if_allowed(tool_name);
2950                    }
2951                }
2952                ContextCategory::Web => {
2953                    push_if_allowed("web");
2954                    // Also include web_search for search-related queries
2955                    push_if_allowed("web_search");
2956                }
2957                ContextCategory::Screen => {
2958                    push_if_allowed("screenshot");
2959                    push_if_allowed("screen_record");
2960                }
2961                ContextCategory::Agent => push_if_allowed("a2a"),
2962                ContextCategory::Mcp => push_if_allowed("mcp"),
2963                ContextCategory::A2a => push_if_allowed("a2a"),
2964                ContextCategory::Task => push_if_allowed("task"),
2965                ContextCategory::Tools => push_if_allowed("permissions"),
2966                ContextCategory::Voice
2967                | ContextCategory::Config
2968                | ContextCategory::Session
2969                | ContextCategory::General => {}
2970            }
2971        }
2972
2973        // If no specific tools found, or confidence is too low to trust the category match,
2974        // fall back to the entire candidate pool so the LLM can make the correct selection
2975        // without seeing disabled or irrelevant session tools.
2976        // confidence < 0.2 means only a single weak keyword fired — not reliable enough to
2977        // narrow the tool set, and risks silently excluding the right tool.
2978        if analysis.needs_tools && (tools.is_empty() || analysis.confidence < 0.2) {
2979            tools = candidate_tools;
2980
2981            if self.pipeline_config.log_token_usage {
2982                tracing::debug!(
2983                    confidence = analysis.confidence,
2984                    candidate_tools = ?tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2985                    "Using candidate-pool fallback (no category match or confidence too low)"
2986                );
2987            }
2988        }
2989
2990        Self::append_task_tool_for_auto_tracked_request(analysis, &candidate_names, &mut tools);
2991
2992        if self.pipeline_config.log_token_usage {
2993            tracing::debug!(
2994                categories = ?analysis.categories,
2995                needs_tools = analysis.needs_tools,
2996                resolved_tools = ?tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2997                "Category-based tool filtering"
2998            );
2999        }
3000
3001        // Deduplicate
3002        tools.sort_by_key(|t| t.name);
3003        tools.dedup_by_key(|t| t.name);
3004
3005        tools
3006    }
3007
3008    /// If the user responds with an approval ("ok", "yes", "please proceed") and the
3009    /// previous assistant message proposed using a tool, ensure this turn is treated as
3010    /// a tool-capable follow-up.
3011    ///
3012    /// This prevents a common failure mode where the model asked for confirmation and,
3013    /// after the user confirms, the follow-up message contains no tool keywords, so the
3014    /// analyzer disables tools and the agent can't complete the action.
3015    fn promote_approval_to_tool_followup(
3016        &self,
3017        request: &AgentRequest,
3018        analysis: &mut crate::context::RequestAnalysis,
3019    ) {
3020        use crate::context::ContextCategory;
3021
3022        let input = request.input.trim().to_lowercase();
3023        let looks_like_approval = matches!(
3024            input.as_str(),
3025            "ok" | "okay"
3026                | "ok."
3027                | "okay."
3028                | "yes"
3029                | "y"
3030                | "sure"
3031                | "please proceed"
3032                | "proceed"
3033                | "go ahead"
3034                | "do it"
3035                | "run it"
3036                | "continue"
3037        ) || input.contains("please proceed")
3038            || input.contains("go ahead")
3039            || input.contains("please do")
3040            || input.contains("yes, proceed");
3041
3042        if !looks_like_approval {
3043            return;
3044        }
3045
3046        // If tools are already enabled, nothing to do.
3047        if analysis.needs_tools {
3048            return;
3049        }
3050
3051        // Find the most recent assistant message.
3052        let Some(prev_assistant) = request.history.iter().rev().find(|m| m.role == "assistant")
3053        else {
3054            return;
3055        };
3056
3057        let prev = prev_assistant.content.to_lowercase();
3058
3059        // Try to infer which tool the assistant intended to use.
3060        let tool_name = if prev.contains("shell tool")
3061            || (prev.contains("run") && prev.contains("pwd"))
3062            || prev.contains("`pwd`")
3063        {
3064            Some("shell")
3065        } else if prev.contains("git tool") || prev.contains("git status") {
3066            Some("git")
3067        } else if prev.contains("file tool") || prev.contains("read file") {
3068            Some("file")
3069        } else if prev.contains("web tool") || prev.contains("search the web") {
3070            Some("web")
3071        } else if prev.contains("code tool") || prev.contains("code stats") {
3072            Some("code")
3073        } else {
3074            None
3075        };
3076
3077        let Some(tool_name) = tool_name else {
3078            return;
3079        };
3080
3081        analysis.needs_tools = true;
3082        analysis.is_followup = true;
3083        analysis.confidence = analysis.confidence.max(0.85);
3084        analysis.suggested_tools.push(tool_name.to_string());
3085
3086        // Ensure the tool category is present so tool filtering will include it.
3087        match tool_name {
3088            "shell" => {
3089                analysis.categories.insert(ContextCategory::Shell);
3090                analysis.categories.insert(ContextCategory::Tools);
3091            }
3092            "git" => {
3093                analysis.categories.insert(ContextCategory::Git);
3094                analysis.categories.insert(ContextCategory::Tools);
3095            }
3096            "file" => {
3097                analysis.categories.insert(ContextCategory::FileSystem);
3098                analysis.categories.insert(ContextCategory::Tools);
3099            }
3100            "web" => {
3101                analysis.categories.insert(ContextCategory::Web);
3102                analysis.categories.insert(ContextCategory::Tools);
3103            }
3104            "code" => {
3105                analysis.categories.insert(ContextCategory::Code);
3106                analysis.categories.insert(ContextCategory::Tools);
3107            }
3108            _ => {
3109                analysis.categories.insert(ContextCategory::Tools);
3110            }
3111        }
3112    }
3113
3114    /// Enrich a resolved context with memory bank entries and enabled knowledge items.
3115    ///
3116    /// G4: Extracted into a shared helper so it is called both on the initial resolve
3117    /// *and* after any auto-compaction re-resolve, ensuring memory bank and knowledge
3118    /// survive context compaction on both the streaming and blocking code paths.
3119    async fn enrich_resolved_context(
3120        &self,
3121        resolved_context: &mut crate::context::ResolvedContext,
3122        request: &AgentRequest,
3123        workspace_dir: Option<&std::path::Path>,
3124        query: &str,
3125        metadata: &RequestMetadata,
3126    ) {
3127        // 3.1 Short-term working memory — session-local and loaded first.
3128        if let Some(short_term_sections) =
3129            self.load_session_working_memory(metadata.session_id.as_deref(), query, 4)
3130            && !short_term_sections.is_empty()
3131        {
3132            tracing::debug!(
3133                memory_sections = short_term_sections.len(),
3134                "Added session working memory to request"
3135            );
3136            resolved_context.memory_sections.extend(short_term_sections);
3137        }
3138
3139        if let Some(workspace_dir) = workspace_dir
3140            && let Some(shared_coordination) = self
3141                .load_shared_coordination_memory(workspace_dir, metadata, 3)
3142                .await
3143            && !shared_coordination.is_empty()
3144        {
3145            tracing::debug!(
3146                shared_coordination_len = shared_coordination.len(),
3147                "Added shared coordination memory to request"
3148            );
3149            resolved_context.memory_sections.extend(shared_coordination);
3150        }
3151
3152        // 3.2 Long-term memory bank — only available when workspace_dir is known.
3153        //     When experiential reflection is enabled, we also inject a small
3154        //     number of relevant past reflections after the regular memory-bank
3155        //     lookup so the model can learn from prior failures without turning
3156        //     every prompt into a reflection dump.
3157        if let Some(workspace_dir) = workspace_dir
3158            && let Some(memory_context) = self
3159                .search_and_load_memory_bank(workspace_dir, metadata, query, 3)
3160                .await
3161        {
3162            tracing::debug!(
3163                memory_context_len = memory_context.len(),
3164                "Added memory bank context to request"
3165            );
3166            resolved_context.memory_sections.extend(memory_context);
3167
3168            if self.reflection_enabled_for(metadata)
3169                && let Some(reflection_sections) = self
3170                    .load_relevant_reflections(workspace_dir, metadata, query)
3171                    .await
3172                && !reflection_sections.is_empty()
3173            {
3174                tracing::debug!(
3175                    reflection_context_len = reflection_sections.len(),
3176                    "Added past reflections to request"
3177                );
3178                resolved_context.memory_sections.extend(reflection_sections);
3179            }
3180        }
3181
3182        // 3.3 Knowledge items — only available when the pipeline was wired with
3183        //     `with_knowledge()` *and* the session has items enabled.
3184        let knowledge_budget_tokens =
3185            self.remaining_knowledge_budget_tokens(request, resolved_context);
3186        if let Some(knowledge_context) = self.load_enabled_knowledge(
3187            metadata.session_id.as_deref(),
3188            query,
3189            knowledge_budget_tokens,
3190        ) {
3191            tracing::debug!(
3192                knowledge_context_len = knowledge_context.len(),
3193                knowledge_budget_tokens = knowledge_budget_tokens,
3194                "Added enabled knowledge to request"
3195            );
3196            resolved_context.knowledge.push(knowledge_context);
3197        } else {
3198            tracing::debug!(
3199                knowledge_budget_tokens = knowledge_budget_tokens,
3200                "No enabled knowledge added after applying prompt budget"
3201            );
3202        }
3203    }
3204}
3205
3206fn merge_token_usage(
3207    usage: &mut Option<crate::llm_provider::TokenUsage>,
3208    additional: &crate::llm_provider::TokenUsage,
3209) {
3210    if let Some(existing) = usage.as_mut() {
3211        existing.input_tokens += additional.input_tokens;
3212        existing.output_tokens += additional.output_tokens;
3213        existing.total_tokens += additional.total_tokens;
3214        existing.estimated_cost_usd =
3215            match (existing.estimated_cost_usd, additional.estimated_cost_usd) {
3216                (Some(lhs), Some(rhs)) => Some(lhs + rhs),
3217                (Some(lhs), None) => Some(lhs),
3218                (None, Some(rhs)) => Some(rhs),
3219                (None, None) => None,
3220            };
3221        if existing.model.is_none() {
3222            existing.model = additional.model.clone();
3223        }
3224        if existing.provider.is_none() {
3225            existing.provider = additional.provider.clone();
3226        }
3227    } else {
3228        *usage = Some(additional.clone());
3229    }
3230}
3231
3232#[cfg(test)]
3233mod tests;