1mod agent_loop;
23mod compaction;
24mod prompt;
25mod reflection;
26mod request_telemetry;
27mod tool_dispatch;
28mod tool_router;
29pub mod types;
30pub(crate) use reflection::sync_task_reflection_outcomes;
31
32use std::collections::HashSet;
33use std::path::Path;
34use std::time::Instant;
35use tokio::sync::mpsc;
36pub use tool_router::{RoutingResult, ToolRouter, build_tool_router};
37use tracing::Instrument as _;
38
39use crate::agent_sessions::{AgentSessionStore, FileAgentSessionStore};
40use crate::checkpoints::{CheckpointManager, CheckpointRetentionPolicy, FileCheckpointStore};
41use crate::config::AppConfig;
42use crate::context::{ContextManager, RequestAnalyzer};
43use crate::error::AppError;
44use crate::hooks::{HookContext, HookEngine, HookEvent};
45use crate::knowledge::{KnowledgeSettingsManager, KnowledgeStore};
46use crate::llm_provider::{AgentContext, select_provider};
47use crate::session_workspace::SessionWorkspace;
48use crate::streaming::{
49 CancellationToken, StreamChunk, start_streaming, start_streaming_with_fallback,
50};
51use crate::tasks::TaskManager;
52use crate::tool_confirmation::TOOL_CONFIRMATIONS;
53use crate::tools::PermissionManager;
54use crate::tools::registry::{ToolDefinition, all_tools};
55use gestura_core_llm::model_capabilities::ModelCapabilitiesCache;
56
57use request_telemetry::{AgentRequestTelemetry, RequestOutcome, RequestRunMode};
58use tool_dispatch::{FinalizePendingToolCallCtx, PendingToolCall};
59pub use types::*;
60
61pub(super) const STREAM_CHUNK_BUFFER_CAPACITY: usize = 256;
62const REQUIREMENT_DETECTION_INPUT_HINT_KEY: &str = "requirement_detection_input";
63const INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY: &str = "internal.requirement_breakdown";
64const NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT: std::time::Duration =
65 std::time::Duration::from_millis(100);
66
67pub(super) async fn send_status_chunk_best_effort(
68 tx: &mpsc::Sender<StreamChunk>,
69 chunk: StreamChunk,
70) {
71 debug_assert!(matches!(chunk, StreamChunk::Status { .. }));
72
73 match tokio::time::timeout(NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
74 Ok(Ok(())) | Ok(Err(_)) => {}
75 Err(_) => {
76 tracing::debug!(
77 timeout_ms = NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT.as_millis(),
78 "Dropping transient status chunk because the stream receiver is not draining fast enough"
79 );
80 }
81 }
82}
83
84pub(super) async fn send_token_usage_chunk_best_effort(
85 tx: &mpsc::Sender<StreamChunk>,
86 chunk: StreamChunk,
87) {
88 debug_assert!(matches!(chunk, StreamChunk::TokenUsageUpdate { .. }));
89
90 match tokio::time::timeout(NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT, tx.send(chunk)).await {
91 Ok(Ok(())) | Ok(Err(_)) => {}
92 Err(_) => {
93 tracing::debug!(
94 timeout_ms = NONCRITICAL_STREAM_CHUNK_SEND_TIMEOUT.as_millis(),
95 "Dropping transient token-usage chunk because the stream receiver is not draining fast enough"
96 );
97 }
98 }
99}
100
101#[cfg(feature = "ring-integration")]
113const MIN_GESTURE_CONFIDENCE: f32 = 0.7;
114
115#[cfg(feature = "ring-integration")]
120const GESTURE_QUEUE_CAPACITY: usize = 32;
121
122#[cfg(feature = "ring-integration")]
149pub async fn process_ring_stream(
150 backend: std::sync::Arc<dyn gestura_core_ring::RingBackend>,
151 observer: std::sync::Arc<dyn crate::orchestrator::OrchestratorObserver>,
152 config: AppConfig,
153) {
154 type Work = (
155 gestura_core_intent::Intent,
156 std::sync::Arc<dyn crate::orchestrator::OrchestratorObserver>,
157 );
158
159 let pipeline = std::sync::Arc::new(AgentPipeline::new(config));
160
161 let (work_tx, mut work_rx) = tokio::sync::mpsc::channel::<Work>(GESTURE_QUEUE_CAPACITY);
163
164 let pipeline_worker = pipeline.clone();
167 tokio::spawn(async move {
168 while let Some((intent, obs)) = work_rx.recv().await {
169 let request = AgentRequest::new(intent.primary_action.clone()).with_streaming(false);
170 match pipeline_worker.process_blocking(request).await {
171 Ok(_) => {
172 obs.on_haptic_feedback(gestura_core_haptics::HapticPattern::Confirm, 1.0, 200)
173 .await;
174 }
175 Err(e) => {
176 tracing::warn!(
177 error = %e,
178 primary_action = %intent.primary_action,
179 "Ring gesture failed to route through agentic pipeline"
180 );
181 obs.on_haptic_feedback(gestura_core_haptics::HapticPattern::Error, 0.5, 150)
182 .await;
183 }
184 }
185 }
186 });
187
188 let mut rx = backend.subscribe_to_gestures().await;
189 loop {
190 match rx.recv().await {
191 Ok(gesture) => {
192 let raw_input = gestura_core_intent::RawInput {
193 text: gesture.gesture_type.clone(),
194 modality: gestura_core_intent::InputModality::Gesture,
195 session_id: None,
196 gesture_data: Some(gestura_core_intent::GestureData {
197 gesture_type: gesture.gesture_type,
198 acceleration: gesture.acceleration,
199 gyroscope: gesture.gyroscope,
200 confidence: gesture.confidence,
201 }),
202 };
203
204 let intent = gestura_core_intent::normalize_input_to_intent(raw_input);
205
206 if intent.confidence < MIN_GESTURE_CONFIDENCE
209 || intent.primary_action == "unknown_gesture"
210 {
211 tracing::debug!(
212 intent_id = %intent.id,
213 primary_action = %intent.primary_action,
214 confidence = intent.confidence,
215 "Skipping low-confidence or unrecognised gesture"
216 );
217 continue;
218 }
219
220 tracing::debug!(
221 intent_id = %intent.id,
222 primary_action = %intent.primary_action,
223 confidence = intent.confidence,
224 "Ring gesture normalised to intent; enqueuing for pipeline"
225 );
226
227 if work_tx.try_send((intent, observer.clone())).is_err() {
230 tracing::warn!(
231 capacity = GESTURE_QUEUE_CAPACITY,
232 "Gesture pipeline queue full; dropping gesture to prevent backlog"
233 );
234 }
235 }
236 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
237 tracing::warn!(
238 "Ring processing stream lagged, {} gestures dropped",
239 skipped
240 );
241 continue;
242 }
243 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
244 tracing::info!("Ring processing stream closed. Stopping task.");
245 break;
246 }
247 }
248 }
249}
250
251fn tools_slice_for_provider(
259 provider_name: &str,
260 model_id: Option<&str>,
261 schemas: &crate::tools::schemas::ProviderToolSchemas,
262) -> Vec<serde_json::Value> {
263 match provider_name {
264 "anthropic" => schemas.anthropic.clone(),
265 "gemini" => schemas.gemini.clone(),
266 "openai"
267 if model_id.is_some_and(|model| {
268 matches!(
269 crate::llm_provider::openai::openai_api_for_model(model),
270 crate::llm_provider::openai::OpenAiApi::Responses
271 )
272 }) =>
273 {
274 schemas.openai_responses.clone()
275 }
276 _ => schemas.openai.clone(),
277 }
278}
279
280fn requirement_detection_input(request: &AgentRequest) -> &str {
281 request
282 .metadata
283 .hints
284 .get(REQUIREMENT_DETECTION_INPUT_HINT_KEY)
285 .map(String::as_str)
286 .filter(|value| !value.trim().is_empty())
287 .unwrap_or(&request.input)
288}
289
290fn is_internal_requirement_breakdown_request(request: &AgentRequest) -> bool {
291 request
292 .metadata
293 .hints
294 .get(INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY)
295 .is_some_and(|value| value == "true")
296}
297
298pub struct AgentPipeline {
300 config: AppConfig,
302 context_manager: ContextManager,
304 analyzer: RequestAnalyzer,
306 pipeline_config: PipelineConfig,
308 permission_manager: PermissionManager,
312 knowledge_store: Option<&'static KnowledgeStore>,
314 knowledge_settings: Option<&'static KnowledgeSettingsManager>,
316 tool_router: Option<Box<dyn tool_router::ToolRouter>>,
318 capabilities_cache: ModelCapabilitiesCache,
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
328enum AutoTrackedRequestShape {
329 CreateOrModify,
330 InvestigateOrFix,
331 AnalyzeOrCompare,
332 ResearchOrFetch,
333 PlanOrDraft,
334 GeneralExecution,
335}
336
337impl AgentPipeline {
338 fn message_contains_any(text: &str, signals: &[&str]) -> bool {
339 signals.iter().any(|signal| text.contains(signal))
340 }
341
342 fn is_simple_verification_request(message: &str) -> bool {
343 let text = message.trim().to_ascii_lowercase();
344 if text.is_empty() {
345 return false;
346 }
347
348 let has_verification_signals = Self::message_contains_any(
349 &text,
350 &[
351 "build and test",
352 "run the build",
353 "run build",
354 "run the tests",
355 "run tests",
356 "run the test",
357 "run test",
358 "cargo build",
359 "cargo check",
360 "cargo test",
361 "verify",
362 "validation",
363 "validate",
364 "compile",
365 "lint",
366 "smoke test",
367 ],
368 );
369 if !has_verification_signals {
370 return false;
371 }
372
373 let has_non_verification_signals = Self::message_contains_any(
374 &text,
375 &[
376 "fix",
377 "debug",
378 "troubleshoot",
379 "resolve",
380 "repair",
381 "investigate",
382 "diagnose",
383 "create",
384 "implement",
385 "set up",
386 "setup",
387 "scaffold",
388 "write",
389 "draft",
390 "update",
391 "change",
392 "modify",
393 "refactor",
394 "migrate",
395 "compare",
396 "review",
397 "audit",
398 "analyze",
399 "analyse",
400 "evaluate",
401 "inspect",
402 "research",
403 "search",
404 "find",
405 "fetch",
406 "gather",
407 "collect",
408 "explore",
409 "plan",
410 "design",
411 "outline",
412 "document",
413 "summarize",
414 "summarise",
415 "propose",
416 ],
417 );
418
419 has_verification_signals && !has_non_verification_signals
420 }
421
422 fn infer_auto_tracked_request_shape(message: &str) -> AutoTrackedRequestShape {
423 let text = message.trim().to_ascii_lowercase();
424
425 if Self::message_contains_any(
426 &text,
427 &[
428 "fix",
429 "debug",
430 "troubleshoot",
431 "resolve",
432 "repair",
433 "investigate why",
434 "diagnose",
435 ],
436 ) {
437 return AutoTrackedRequestShape::InvestigateOrFix;
438 }
439
440 if Self::message_contains_any(
441 &text,
442 &[
443 "compare", "review", "audit", "assess", "analyze", "analyse", "evaluate", "inspect",
444 ],
445 ) {
446 return AutoTrackedRequestShape::AnalyzeOrCompare;
447 }
448
449 if Self::message_contains_any(
450 &text,
451 &[
452 "research", "find", "look up", "lookup", "search", "fetch", "gather", "collect",
453 "explore",
454 ],
455 ) {
456 return AutoTrackedRequestShape::ResearchOrFetch;
457 }
458
459 if Self::message_contains_any(
460 &text,
461 &[
462 "create",
463 "implement",
464 "build",
465 "set up",
466 "setup",
467 "scaffold",
468 "write",
469 "draft",
470 "compose",
471 "update",
472 "change",
473 "modify",
474 "refactor",
475 "migrate",
476 ],
477 ) {
478 return AutoTrackedRequestShape::CreateOrModify;
479 }
480
481 if Self::message_contains_any(
482 &text,
483 &[
484 "plan",
485 "design",
486 "outline",
487 "document",
488 "summarize",
489 "summarise",
490 "propose",
491 ],
492 ) {
493 return AutoTrackedRequestShape::PlanOrDraft;
494 }
495
496 AutoTrackedRequestShape::GeneralExecution
497 }
498
499 fn ensure_request_session_id(request: &mut AgentRequest) {
500 let missing_session_id = request
501 .metadata
502 .session_id
503 .as_deref()
504 .is_none_or(|session_id| session_id.trim().is_empty());
505
506 if !missing_session_id {
507 return;
508 }
509
510 let generated = format!("agent-run-{}", uuid::Uuid::new_v4());
511 tracing::warn!(
512 generated_session_id = %generated,
513 source = ?request.metadata.source,
514 "Agent request missing session_id; synthesizing unique session id for isolated workspace/task tracking"
515 );
516 request.metadata.session_id = Some(generated);
517 }
518
519 fn should_auto_track_request(message: &str, explicit_task_id: Option<&str>) -> bool {
520 if explicit_task_id.is_some() {
521 return false;
522 }
523
524 let text = message.trim().to_ascii_lowercase();
525 if text.is_empty() || text.starts_with('/') {
526 return false;
527 }
528
529 if Self::is_simple_verification_request(&text) {
530 return false;
531 }
532
533 Self::message_contains_any(
534 &text,
535 &[
536 "plan and implement",
537 "carefully plan",
538 "build and test",
539 "implement then",
540 "step by step",
541 "break this down",
542 "end to end",
543 "scaffold",
544 "refactor",
545 "fix",
546 "debug",
547 "build",
548 "test",
549 "compare",
550 "review",
551 "audit",
552 "analyze",
553 "analyse",
554 "assess",
555 "evaluate",
556 "inspect",
557 "research",
558 "search",
559 "find",
560 "fetch",
561 "gather",
562 "collect",
563 "explore",
564 "plan",
565 "design",
566 "outline",
567 "document",
568 "write",
569 "draft",
570 "summarize",
571 "summarise",
572 "propose",
573 "investigate",
574 "troubleshoot",
575 ],
576 ) && text.split_whitespace().count() >= 6
577 }
578
579 fn should_offer_task_tool_for_request(analysis: &crate::context::RequestAnalysis) -> bool {
580 analysis.needs_tools && Self::should_auto_track_request(&analysis.request, None)
581 }
582
583 fn reflection_enabled_for(&self, metadata: &RequestMetadata) -> bool {
584 metadata
585 .reflection_enabled
586 .unwrap_or(self.pipeline_config.reflection.enabled)
587 }
588
589 #[inline(always)]
590 async fn maybe_apply_advanced_primitives_middleware(
591 &self,
592 request: &mut AgentRequest,
593 analysis: &crate::context::RequestAnalysis,
594 ) {
595 let intent = requirement_detection_input(request).trim().to_string();
596 if !gestura_core_tasks::ADVANCED_PRIMITIVES_ENABLED
597 || is_internal_requirement_breakdown_request(request)
598 || !analysis.needs_tools
599 || !Self::should_auto_track_request(&intent, request.metadata.task_id.as_deref())
600 {
601 return;
602 }
603
604 let enhancement = gestura_core_tasks::AdvancedPrimitives::run_enhanced_plan(
605 gestura_core_tasks::AdvancedPlanRequest {
606 user_intent: intent.clone(),
607 base_system_prompt: request.system_prompt.clone().unwrap_or_else(|| {
608 gestura_core_pipeline::persona::default_system_prompt(&request.metadata)
609 }),
610 session_id: request.metadata.session_id.clone(),
611 task_id: request.metadata.task_id.clone(),
612 source: format!("{:?}", request.metadata.source),
613 complex_intent: true,
614 requires_verification: Self::prompt_requires_build_and_test(&intent),
615 metadata_hints: request.metadata.hints.clone(),
616 },
617 )
618 .await;
619
620 if enhancement.applied {
621 request.system_prompt = Some(enhancement.system_prompt);
622 request.metadata.hints.extend(enhancement.metadata_hints);
623 }
624 }
625
626 #[inline(always)]
634 fn maybe_attach_normalized_intent(request: &mut AgentRequest) {
635 if !gestura_core_intent::INTENT_NORMALIZATION_ENABLED {
636 return;
637 }
638
639 let modality =
640 gestura_core_intent::InputModality::from_request_source(&request.metadata.source);
641 let raw_input = gestura_core_intent::RawInput {
642 text: request.input.clone(),
643 modality,
644 session_id: request.metadata.session_id.clone(),
645 gesture_data: None,
646 };
647 let intent = gestura_core_intent::normalize_input_to_intent(raw_input);
648
649 tracing::debug!(
650 intent_id = %intent.id,
651 modality = %intent.modality.label(),
652 action = %intent.primary_action,
653 confidence = intent.confidence,
654 "Normalized input to unified intent"
655 );
656
657 request
658 .metadata
659 .hints
660 .insert("intent.id".to_string(), intent.id.clone());
661 request.metadata.hints.insert(
662 "intent.primary_action".to_string(),
663 intent.primary_action.clone(),
664 );
665 request.metadata.hints.insert(
666 "intent.modality".to_string(),
667 intent.modality.label().to_string(),
668 );
669 request.metadata.hints.insert(
670 "intent.confidence".to_string(),
671 format!("{:.2}", intent.confidence),
672 );
673 if !intent.context_hints.is_empty() {
674 request.metadata.hints.insert(
675 "intent.context_hints".to_string(),
676 intent.context_hints.join(","),
677 );
678 }
679 }
680
681 fn append_task_tool_for_auto_tracked_request(
682 analysis: &crate::context::RequestAnalysis,
683 candidate_names: &HashSet<&str>,
684 tools: &mut Vec<&'static ToolDefinition>,
685 ) {
686 if !candidate_names.contains("task")
687 || !Self::should_offer_task_tool_for_request(analysis)
688 || tools.iter().any(|tool| tool.name == "task")
689 {
690 return;
691 }
692
693 if let Some(task_tool) = crate::tools::registry::find_tool("task") {
694 tools.push(task_tool);
695 }
696 }
697
698 fn derive_agent_request_task_name(message: &str) -> String {
699 let trimmed = message.trim();
700 if trimmed.is_empty() {
701 return "Agent request".to_string();
702 }
703
704 let sentence = trimmed
705 .split(['\n', '.', '!', '?'])
706 .next()
707 .unwrap_or(trimmed)
708 .trim();
709 let candidate =
710 sentence.trim_matches(|ch: char| ch.is_ascii_punctuation() || ch.is_whitespace());
711 if candidate.is_empty() {
712 "Agent request".to_string()
713 } else {
714 candidate.chars().take(96).collect()
715 }
716 }
717
718 pub async fn generate_requirement_breakdown_specs(
720 cfg: AppConfig,
721 source: RequestSource,
722 session_id: Option<&str>,
723 requirements: &str,
724 ) -> Result<Vec<crate::tasks::RequirementBreakdownTaskSpec>, String> {
725 let pipeline = AgentPipeline::with_provider_optimized_config(cfg);
726 let mut request = AgentRequest::new(Self::build_requirement_breakdown_prompt(requirements))
727 .with_streaming(false)
728 .with_source(source)
729 .with_tools_enabled(false);
730
731 if let Some(session_id) = session_id {
732 request = request.with_session(session_id);
733 }
734 request.metadata.hints.insert(
735 INTERNAL_REQUIREMENT_BREAKDOWN_HINT_KEY.to_string(),
736 "true".to_string(),
737 );
738
739 let response = Box::pin(pipeline.process_blocking(request))
740 .await
741 .map_err(|error| format!("LLM error: {error}"))?;
742
743 crate::tasks::parse_requirement_breakdown_response(&response.content)
744 .map_err(|error| error.to_string())
745 }
746
747 pub fn default_auto_tracked_execution_specs(
749 message: &str,
750 ) -> Vec<crate::tasks::RequirementBreakdownTaskSpec> {
751 Self::default_auto_tracked_execution_subtasks(message)
752 .into_iter()
753 .map(
754 |(name, description)| crate::tasks::RequirementBreakdownTaskSpec {
755 name,
756 description,
757 priority: "high".to_string(),
758 is_blocking: false,
759 parent_name: None,
760 },
761 )
762 .collect()
763 }
764
765 fn build_requirement_breakdown_prompt(requirements: &str) -> String {
766 format!(
767 r#"You are a project planning assistant. Analyze the following requirements and break them down into a structured task list.
768
769Requirements:
770{}
771
772Please respond with a JSON array of tasks. Each task should have:
773- "name": A concise task name (max 60 chars)
774- "description": A detailed description of what needs to be done
775- "priority": "high", "medium", or "low"
776- "is_blocking": true if other tasks depend on this, false otherwise
777- "parent_name": null for root tasks, or the exact name of the parent task for subtasks
778
779Order tasks by priority and logical execution order. Group related tasks under parent tasks.
780
781Example format:
782[
783 {{"name": "Setup project structure", "description": "Initialize the project...", "priority": "high", "is_blocking": true, "parent_name": null}},
784 {{"name": "Configure build system", "description": "Set up the build...", "priority": "high", "is_blocking": false, "parent_name": "Setup project structure"}}
785]
786
787Respond ONLY with the JSON array, no additional text."#,
788 requirements
789 )
790 }
791
792 fn default_auto_tracked_execution_subtasks(message: &str) -> Vec<(String, String)> {
793 let request = message.trim();
794 let mentions_validation = Self::message_contains_any(
795 &request.to_ascii_lowercase(),
796 &[
797 "build",
798 "test",
799 "verify",
800 "validation",
801 "validate",
802 "check",
803 "run",
804 "compile",
805 "lint",
806 "smoke",
807 ],
808 );
809
810 match Self::infer_auto_tracked_request_shape(request) {
811 AutoTrackedRequestShape::CreateOrModify => vec![
812 (
813 "Inspect the current state and constraints".to_string(),
814 format!(
815 "Inspect the current environment, relevant context, and constraints before acting on:\n\n{}",
816 request
817 ),
818 ),
819 (
820 "Prepare the starting point or prerequisites".to_string(),
821 format!(
822 "Set up the starting point, prerequisites, or scaffolding needed to complete:\n\n{}",
823 request
824 ),
825 ),
826 (
827 "Carry out the requested work".to_string(),
828 format!(
829 "Create, modify, draft, or otherwise perform the primary work needed for:\n\n{}",
830 request
831 ),
832 ),
833 (
834 "Validate the result and summarize follow-up".to_string(),
835 if mentions_validation {
836 format!(
837 "Run the appropriate verification steps, checks, or commands and summarize the final outcome for:\n\n{}",
838 request
839 )
840 } else {
841 format!(
842 "Review the completed result, verify it in the most appropriate way, and summarize any follow-up for:\n\n{}",
843 request
844 )
845 },
846 ),
847 ],
848 AutoTrackedRequestShape::InvestigateOrFix => vec![
849 (
850 "Investigate the current issue or constraints".to_string(),
851 format!(
852 "Gather the evidence needed to understand the problem, failure, or constraints involved in:\n\n{}",
853 request
854 ),
855 ),
856 (
857 "Apply the fix or adjustment".to_string(),
858 format!(
859 "Make the change, adjustment, or corrective action needed to address:\n\n{}",
860 request
861 ),
862 ),
863 (
864 "Validate the fix and remaining risk".to_string(),
865 format!(
866 "Verify whether the issue is resolved and summarize any remaining risk or follow-up for:\n\n{}",
867 request
868 ),
869 ),
870 ],
871 AutoTrackedRequestShape::AnalyzeOrCompare => vec![
872 (
873 "Inspect the relevant inputs and criteria".to_string(),
874 format!(
875 "Gather the materials, context, and comparison criteria needed to evaluate:\n\n{}",
876 request
877 ),
878 ),
879 (
880 "Analyze the findings and identify gaps".to_string(),
881 format!(
882 "Analyze the relevant differences, patterns, tradeoffs, or gaps involved in:\n\n{}",
883 request
884 ),
885 ),
886 (
887 "Summarize conclusions and recommended actions".to_string(),
888 format!(
889 "Deliver a clear summary of the conclusions, recommendations, or next steps for:\n\n{}",
890 request
891 ),
892 ),
893 ],
894 AutoTrackedRequestShape::ResearchOrFetch => vec![
895 (
896 "Gather the relevant context and sources".to_string(),
897 format!(
898 "Collect the most relevant information, evidence, or source material needed for:\n\n{}",
899 request
900 ),
901 ),
902 (
903 "Extract the information that matters".to_string(),
904 format!(
905 "Filter and organize the most useful details, signals, or facts for:\n\n{}",
906 request
907 ),
908 ),
909 (
910 "Summarize findings and next steps".to_string(),
911 format!(
912 "Present the findings clearly and include any recommended next steps for:\n\n{}",
913 request
914 ),
915 ),
916 ],
917 AutoTrackedRequestShape::PlanOrDraft => vec![
918 (
919 "Clarify the goal, audience, and constraints".to_string(),
920 format!(
921 "Identify the intent, audience, constraints, and success criteria behind:\n\n{}",
922 request
923 ),
924 ),
925 (
926 "Draft the requested output".to_string(),
927 format!(
928 "Produce the requested plan, document, draft, or structured output for:\n\n{}",
929 request
930 ),
931 ),
932 (
933 "Review and refine the result".to_string(),
934 format!(
935 "Review the draft for completeness, quality, and alignment with the request:\n\n{}",
936 request
937 ),
938 ),
939 ],
940 AutoTrackedRequestShape::GeneralExecution => vec![
941 (
942 "Inspect the current state and gather context".to_string(),
943 format!(
944 "Inspect the current state, gather the needed context, and identify the concrete next steps for:\n\n{}",
945 request
946 ),
947 ),
948 (
949 "Carry out the requested work".to_string(),
950 format!(
951 "Perform the primary work needed to complete:\n\n{}",
952 request
953 ),
954 ),
955 (
956 "Verify the outcome and summarize next steps".to_string(),
957 if mentions_validation {
958 format!(
959 "Run the appropriate checks or commands, verify the outcome, and summarize what remains for:\n\n{}",
960 request
961 )
962 } else {
963 format!(
964 "Review the outcome, verify that it satisfies the request, and summarize next steps for:\n\n{}",
965 request
966 )
967 },
968 ),
969 ],
970 }
971 }
972
973 fn build_auto_tracked_execution_handoff_message(
974 original_message: &str,
975 root_task_name: &str,
976 planned_subtasks: &[String],
977 ) -> String {
978 let mut handoff = String::new();
979 handoff.push_str(original_message.trim());
980 handoff.push_str("\n\n[Runtime execution handoff]\n");
981 handoff.push_str(&format!(
982 "A task plan already exists for this request under the tracked task \"{}\". Execute that plan now instead of creating a fresh plan from scratch.\n",
983 root_task_name
984 ));
985 if !planned_subtasks.is_empty() {
986 handoff.push_str("Planned tracked subtasks:\n");
987 for subtask in planned_subtasks {
988 handoff.push_str("- ");
989 handoff.push_str(subtask);
990 handoff.push('\n');
991 }
992 }
993 handoff.push_str(
994 "Update task statuses as you start and finish each concrete subtask. Keep research, planning, and other inspection-heavy subtasks `inprogress` until you are genuinely done gathering or synthesizing evidence for that phase; a single search, read, or fetch usually means the task has started, not finished. If you create new work, create a concrete subtask with a specific name. When that new work is follow-on execution discovered while finishing the current task, attach it to the tracked root plan rather than nesting it under the currently executing task unless it is a true blocking prerequisite. That keeps the current execution task completable once the handoff is recorded. Do not mark the tracked root task complete until every planned subtask is completed or explicitly cancelled for a real reason. Begin concrete work immediately. If the request is primarily analysis or research, gather evidence first and summarize the outcome clearly instead of forcing unnecessary edits or commands."
995 );
996 handoff
997 }
998
999 async fn maybe_initialize_tracked_request_task(
1000 &self,
1001 request: &mut AgentRequest,
1002 analysis_needs_tools: bool,
1003 task_tool_available: bool,
1004 ) {
1005 if request.metadata.task_id.is_some()
1006 || is_internal_requirement_breakdown_request(request)
1007 || !analysis_needs_tools
1008 || !task_tool_available
1009 || !Self::should_auto_track_request(&request.input, None)
1010 {
1011 return;
1012 }
1013
1014 Self::ensure_request_session_id(request);
1015 let Some(session_id) = request.metadata.session_id.as_deref() else {
1016 return;
1017 };
1018
1019 let manager = crate::get_global_task_manager();
1020 let original_input = request.input.trim().to_string();
1021 request
1022 .metadata
1023 .hints
1024 .entry(REQUIREMENT_DETECTION_INPUT_HINT_KEY.to_string())
1025 .or_insert_with(|| original_input.clone());
1026
1027 let task_name = Self::derive_agent_request_task_name(&original_input);
1028 let requirement_input = requirement_detection_input(request).to_string();
1029 let plan_specs = match Box::pin(Self::generate_requirement_breakdown_specs(
1030 self.config.clone(),
1031 request.metadata.source,
1032 Some(session_id),
1033 &original_input,
1034 ))
1035 .await
1036 {
1037 Ok(specs) => specs,
1038 Err(error) => {
1039 tracing::warn!(
1040 session_id = %session_id,
1041 error = %error,
1042 "Failed to generate structured tracked plan; falling back to default tracked subtasks"
1043 );
1044 Self::default_auto_tracked_execution_specs(&original_input)
1045 }
1046 };
1047
1048 match manager.initialize_auto_tracked_execution_plan(
1049 session_id,
1050 &task_name,
1051 &original_input,
1052 &plan_specs,
1053 ) {
1054 Ok(plan) => {
1055 request.metadata.task_id = Some(plan.root_task.id.clone());
1056 request.input = Self::build_auto_tracked_execution_handoff_message(
1057 &requirement_input,
1058 &plan.root_task.name,
1059 &plan.planned_subtasks,
1060 );
1061 tracing::info!(
1062 session_id = %session_id,
1063 task_id = %plan.root_task.id,
1064 task_name = %plan.root_task.name,
1065 planned_subtasks = plan.generated_task_count,
1066 initial_task_id = ?plan.initial_task_id,
1067 "Initialized tracked root task and shared execution plan for agent request"
1068 );
1069 }
1070 Err(error) => {
1071 tracing::warn!(
1072 session_id = %session_id,
1073 error = %error,
1074 "Failed to initialize tracked root task for agent request"
1075 );
1076 }
1077 }
1078 }
1079
1080 fn build_context_manager() -> ContextManager {
1082 ContextManager::new().with_tool_provider(Box::new(|| {
1083 crate::tools::registry::all_tools()
1084 .iter()
1085 .map(|t| (t.name.to_string(), t.description.to_string()))
1086 .collect()
1087 }))
1088 }
1089
1090 pub fn new(config: AppConfig) -> Self {
1093 let pipeline_config = PipelineConfig::default().with_user_settings(&config.pipeline);
1094 let arc_config = std::sync::Arc::new(config.clone());
1095 let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1096 Self {
1097 config,
1098 context_manager: Self::build_context_manager(),
1099 analyzer: RequestAnalyzer::new(),
1100 pipeline_config,
1101 permission_manager: PermissionManager::new(),
1102 knowledge_store: None,
1103 knowledge_settings: None,
1104 tool_router,
1105 capabilities_cache: ModelCapabilitiesCache::new(),
1106 }
1107 }
1108
1109 pub fn with_config(config: AppConfig, pipeline_config: PipelineConfig) -> Self {
1111 let arc_config = std::sync::Arc::new(config.clone());
1112 let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1113 Self {
1114 config,
1115 context_manager: Self::build_context_manager(),
1116 analyzer: RequestAnalyzer::new(),
1117 pipeline_config,
1118 permission_manager: PermissionManager::new(),
1119 knowledge_store: None,
1120 knowledge_settings: None,
1121 tool_router,
1122 capabilities_cache: ModelCapabilitiesCache::new(),
1123 }
1124 }
1125
1126 pub fn with_knowledge(
1128 mut self,
1129 store: &'static KnowledgeStore,
1130 settings: &'static KnowledgeSettingsManager,
1131 ) -> Self {
1132 self.knowledge_store = Some(store);
1133 self.knowledge_settings = Some(settings);
1134 self
1135 }
1136
1137 fn create_hook_engine(&self) -> Option<HookEngine> {
1141 if !self.config.hooks.enabled || self.config.hooks.hooks.is_empty() {
1142 return None;
1143 }
1144 Some(HookEngine::new(self.config.hooks.clone()))
1145 }
1146
1147 async fn run_hook_best_effort(&self, engine: &HookEngine, event: HookEvent, ctx: &HookContext) {
1152 match engine.run(event, ctx).await {
1153 Ok(records) => {
1154 for record in &records {
1155 tracing::debug!(
1156 hook = %record.name,
1157 event = ?record.event,
1158 exit_code = record.output.exit_code,
1159 "Hook executed (best-effort)"
1160 );
1161 }
1162 }
1163 Err(e) => {
1164 tracing::warn!(
1165 event = ?event,
1166 error = %e,
1167 "Hook execution failed (best-effort, continuing)"
1168 );
1169 }
1170 }
1171 }
1172
1173 async fn ensure_mcp_servers_connected(&self) {
1176 let registry = crate::mcp::get_mcp_client_registry();
1177 let connected = registry.connected_servers().await;
1178
1179 for entry in &self.config.mcp_servers {
1180 if !entry.enabled || connected.contains(&entry.name) {
1181 continue;
1182 }
1183 match registry.connect(entry).await {
1184 Ok(tools) => {
1185 tracing::info!(
1186 server = %entry.name,
1187 tool_count = tools.len(),
1188 "MCP server connected"
1189 );
1190 }
1191 Err(e) => {
1192 tracing::warn!(
1193 server = %entry.name,
1194 error = %e,
1195 "Failed to connect MCP server (skipping)"
1196 );
1197 }
1198 }
1199 }
1200 }
1201
1202 async fn ensure_mcp_servers_connected_streaming(
1207 &self,
1208 tx: &mpsc::Sender<StreamChunk>,
1209 cancel_token: &CancellationToken,
1210 ) {
1211 use tokio::time::{Duration, MissedTickBehavior};
1212
1213 let registry = crate::mcp::get_mcp_client_registry();
1214 let connected = registry.connected_servers().await;
1215
1216 for entry in &self.config.mcp_servers {
1217 if cancel_token.is_cancelled() {
1218 return;
1219 }
1220 if !entry.enabled || connected.contains(&entry.name) {
1221 continue;
1222 }
1223
1224 let base_msg = format!("Connecting to MCP server '{}'…", entry.name);
1225 let _ = tx
1226 .send(StreamChunk::Status {
1227 message: base_msg.clone(),
1228 })
1229 .await;
1230
1231 let mut tick = tokio::time::interval(Duration::from_secs(20));
1234 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
1235
1236 let connect_fut = registry.connect(entry);
1237 tokio::pin!(connect_fut);
1238
1239 let result = loop {
1240 if cancel_token.is_cancelled() {
1241 return;
1242 }
1243
1244 tokio::select! {
1245 res = &mut connect_fut => break res,
1246 _ = tick.tick() => {
1247 let _ = tx.send(StreamChunk::Status { message: base_msg.clone() }).await;
1248 }
1249 }
1250 };
1251
1252 match result {
1253 Ok(tools) => {
1254 tracing::info!(
1255 server = %entry.name,
1256 tool_count = tools.len(),
1257 "MCP server connected"
1258 );
1259 let _ = tx
1260 .send(StreamChunk::Status {
1261 message: format!(
1262 "MCP server '{}' connected ({})",
1263 entry.name,
1264 tools.len()
1265 ),
1266 })
1267 .await;
1268 }
1269 Err(e) => {
1270 tracing::warn!(
1271 server = %entry.name,
1272 error = %e,
1273 "Failed to connect MCP server (skipping)"
1274 );
1275 let _ = tx
1276 .send(StreamChunk::Status {
1277 message: format!("MCP server '{}' unavailable (skipping)", entry.name),
1278 })
1279 .await;
1280 }
1281 }
1282 }
1283 }
1284
1285 fn try_create_checkpoint_before_tool(&self, session_id: &str, tool_name: &str) {
1289 let label = format!("before:{}", tool_name);
1290
1291 let session_store = FileAgentSessionStore::new_default();
1293 let checkpoint_store = FileCheckpointStore::new_default();
1294 let manager =
1295 CheckpointManager::new(checkpoint_store, CheckpointRetentionPolicy::default());
1296
1297 let task_manager = TaskManager::new(AppConfig::data_dir());
1299
1300 match manager.create_session_checkpoint(
1301 session_id,
1302 &session_store,
1303 &task_manager,
1304 &self.config,
1305 Some(label),
1306 ) {
1307 Ok(meta) => {
1308 tracing::info!(
1309 checkpoint_id = %meta.id,
1310 session_id = session_id,
1311 tool = tool_name,
1312 "Created auto-checkpoint before write tool"
1313 );
1314 }
1315 Err(e) => {
1316 tracing::warn!(
1317 session_id = session_id,
1318 tool = tool_name,
1319 error = %e,
1320 "Failed to create auto-checkpoint (continuing with tool execution)"
1321 );
1322 }
1323 }
1324 }
1325
1326 pub fn with_provider_optimized_config(config: AppConfig) -> Self {
1333 let provider = config.llm.primary.as_str();
1334 let model_id = Self::extract_model_id(&config, provider);
1335 let capabilities_cache = ModelCapabilitiesCache::new();
1336
1337 let pipeline_config = if let Some(model) = model_id {
1339 PipelineConfig::for_model_with_cache(provider, model, &capabilities_cache)
1340 .with_user_settings(&config.pipeline)
1341 } else {
1342 PipelineConfig::for_provider(provider).with_user_settings(&config.pipeline)
1343 };
1344
1345 tracing::info!(
1346 provider = provider,
1347 model = model_id.unwrap_or("unknown"),
1348 max_context_tokens = pipeline_config.max_context_tokens,
1349 max_history_messages = pipeline_config.max_history_messages,
1350 auto_compact_threshold = pipeline_config.auto_compact_threshold,
1351 compaction_strategy = ?pipeline_config.compaction_strategy,
1352 "Created pipeline with model-optimized configuration and user settings"
1353 );
1354
1355 let arc_config = std::sync::Arc::new(config.clone());
1356 let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1357 Self {
1358 config,
1359 context_manager: Self::build_context_manager(),
1360 analyzer: RequestAnalyzer::new(),
1361 pipeline_config,
1362 permission_manager: PermissionManager::new(),
1363 knowledge_store: None,
1364 knowledge_settings: None,
1365 tool_router,
1366 capabilities_cache,
1367 }
1368 }
1369
1370 pub fn with_shared_capabilities_cache(
1375 config: AppConfig,
1376 capabilities_cache: ModelCapabilitiesCache,
1377 ) -> Self {
1378 let provider = config.llm.primary.as_str();
1379 let model_id = Self::extract_model_id(&config, provider);
1380
1381 let pipeline_config = if let Some(model) = model_id {
1382 PipelineConfig::for_model_with_cache(provider, model, &capabilities_cache)
1383 .with_user_settings(&config.pipeline)
1384 } else {
1385 PipelineConfig::for_provider(provider).with_user_settings(&config.pipeline)
1386 };
1387
1388 tracing::info!(
1389 provider = provider,
1390 model = model_id.unwrap_or("unknown"),
1391 max_context_tokens = pipeline_config.max_context_tokens,
1392 "Created pipeline with shared capabilities cache"
1393 );
1394
1395 let arc_config = std::sync::Arc::new(config.clone());
1396 let tool_router = build_tool_router(&pipeline_config.tool_routing_strategy, arc_config);
1397 Self {
1398 config,
1399 context_manager: Self::build_context_manager(),
1400 analyzer: RequestAnalyzer::new(),
1401 pipeline_config,
1402 permission_manager: PermissionManager::new(),
1403 knowledge_store: None,
1404 knowledge_settings: None,
1405 tool_router,
1406 capabilities_cache,
1407 }
1408 }
1409
1410 fn extract_model_id<'a>(config: &'a AppConfig, provider: &str) -> Option<&'a str> {
1412 let model = match provider {
1413 "openai" => config.llm.openai.as_ref().map(|c| c.model.as_str()),
1414 "anthropic" => config.llm.anthropic.as_ref().map(|c| c.model.as_str()),
1415 "grok" => config.llm.grok.as_ref().map(|c| c.model.as_str()),
1416 "gemini" => config.llm.gemini.as_ref().map(|c| c.model.as_str()),
1417 "ollama" => config.llm.ollama.as_ref().map(|c| c.model.as_str()),
1418 _ => None,
1419 };
1420 tracing::debug!(
1421 provider = provider,
1422 model = ?model,
1423 has_openai_config = config.llm.openai.is_some(),
1424 "[extract_model_id] Extracted model ID from config"
1425 );
1426 model
1427 }
1428
1429 pub fn capabilities_cache(&self) -> &ModelCapabilitiesCache {
1431 &self.capabilities_cache
1432 }
1433
1434 fn effective_request_max_iterations(&self, request: &AgentRequest) -> Option<usize> {
1435 if let Some(override_limit) = request.max_iterations {
1436 return Some(override_limit);
1437 }
1438
1439 if !self.pipeline_config.iteration_budget_enabled {
1440 return None;
1441 }
1442
1443 if request.metadata.task_id.is_some() {
1444 Some(self.pipeline_config.tracked_task_max_iterations.max(1))
1445 } else {
1446 Some(self.pipeline_config.max_iterations.max(1))
1447 }
1448 }
1449
1450 pub async fn process_streaming(
1455 &self,
1456 request: AgentRequest,
1457 tx: mpsc::Sender<StreamChunk>,
1458 cancel_token: CancellationToken,
1459 ) -> Result<AgentResponse, AppError> {
1460 let request = if let Some(paused) = request.resume_from.clone() {
1464 tracing::info!(
1465 iteration = paused.iteration,
1466 partial_len = paused.partial_content.len(),
1467 tool_calls = paused.completed_tool_calls.len(),
1468 "Resuming from paused execution state"
1469 );
1470
1471 let _ = tx
1472 .send(StreamChunk::Status {
1473 message: "Resuming paused session…".to_string(),
1474 })
1475 .await;
1476
1477 let mut resumed_history = paused.history.clone();
1480 resumed_history.push(Message::user(&paused.original_input));
1482 if !paused.partial_content.is_empty() {
1484 resumed_history.push(Message::assistant(&paused.partial_content));
1485 }
1486 for tc in &paused.completed_tool_calls {
1488 let output = match &tc.result {
1489 ToolResult::Success(s) => s.clone(),
1490 ToolResult::Error(e) => format!("Error: {e}"),
1491 ToolResult::Skipped(msg) => format!("Skipped: {msg}"),
1492 };
1493 resumed_history.push(Message::tool_result(&tc.id, &output));
1494 }
1495
1496 let resume_input = if paused.has_content() {
1497 "Please continue from where you left off. \
1498 Your previous response was interrupted."
1499 .to_string()
1500 } else {
1501 paused.original_input.clone()
1502 };
1503
1504 let mut resumed = AgentRequest::new(resume_input)
1505 .with_streaming(request.streaming)
1506 .with_history(resumed_history);
1507 resumed.max_iterations = request.max_iterations;
1508 resumed.metadata = request.metadata.clone();
1509 if let Some(sp) = paused.system_prompt {
1510 resumed = resumed.with_system_prompt(sp);
1511 } else if let Some(sp) = request.system_prompt.clone() {
1512 resumed = resumed.with_system_prompt(sp);
1513 }
1514 resumed
1515 } else {
1516 request
1517 };
1518
1519 let mut request = request;
1523 if request.metadata.workspace_dir.is_none()
1524 && let Ok(cwd) = std::env::current_dir()
1525 {
1526 tracing::debug!(
1527 cwd = %cwd.display(),
1528 "workspace_dir not set; defaulting to CWD"
1529 );
1530 request.metadata.workspace_dir = Some(cwd);
1531 }
1532 Self::ensure_request_session_id(&mut request);
1533 Self::ensure_request_session_id(&mut request);
1534
1535 let telemetry = AgentRequestTelemetry::start(
1536 &request,
1537 RequestRunMode::Streaming,
1538 self.config.pipeline.agent_telemetry.enabled,
1539 )
1540 .await;
1541 let result = telemetry.in_request_scope(async {
1542
1543 let mut analysis = tracing::info_span!("agent.pipeline.analyze_request").in_scope(|| {
1545 let mut analysis = self.analyzer.analyze(&request.input);
1546
1547 self.promote_approval_to_tool_followup(&request, &mut analysis);
1551 analysis
1552 });
1553 telemetry.record_analysis(&analysis).await;
1556 tracing::debug!(
1557 "Request analysis: categories={:?}, needs_tools={}, confidence={}",
1558 analysis.categories,
1559 analysis.needs_tools,
1560 analysis.confidence
1561 );
1562 self.maybe_apply_advanced_primitives_middleware(&mut request, &analysis).await;
1563 Self::maybe_attach_normalized_intent(&mut request);
1564
1565 if let Some(router) = &self.tool_router
1569 && analysis.needs_tools
1570 {
1571 let all: Vec<&'static ToolDefinition> = all_tools().iter().collect();
1572 let routing = router
1573 .route(&request.input, &all, analysis.confidence)
1574 .instrument(tracing::info_span!("agent.pipeline.route_tools"))
1575 .await;
1576 if routing.has_selection() {
1577 tracing::debug!(
1578 tools = ?routing.suggested_tools,
1579 "Pre-flight LLM router selected tools (streaming)"
1580 );
1581 analysis.suggested_tools = routing.suggested_tools;
1582 }
1583 }
1584
1585 let tools_enabled_for_request = request.metadata.tools_enabled.unwrap_or(true);
1587
1588 let relevant_tools = if self.pipeline_config.enable_tools
1589 && tools_enabled_for_request
1590 && analysis.needs_tools
1591 {
1592 self.get_tools_for_analysis(&analysis, &request.metadata.allowed_tools)
1593 } else {
1594 Vec::new()
1595 };
1596
1597 let include_mcp_tool_schemas = self.pipeline_config.enable_tools
1603 && tools_enabled_for_request
1604 && analysis.needs_tools
1605 && (
1606 relevant_tools.iter().any(|t| t.name == "mcp")
1608 || request
1610 .metadata
1611 .allowed_tools
1612 .iter()
1613 .any(|t| t == "mcp" || t.starts_with("mcp__"))
1614 );
1615 self.maybe_initialize_tracked_request_task(
1616 &mut request,
1617 analysis.needs_tools,
1618 relevant_tools.iter().any(|tool| tool.name == "task"),
1619 )
1620 .await;
1621 telemetry
1623 .record_tool_selection(
1624 relevant_tools.len(),
1625 include_mcp_tool_schemas,
1626 analysis.needs_tools && relevant_tools.is_empty(),
1627 )
1628 .await;
1629 tracing::debug!(
1630 "Relevant tools: {:?}",
1631 relevant_tools.iter().map(|t| t.name).collect::<Vec<_>>()
1632 );
1633
1634 let workspace = request.metadata.workspace_dir.as_ref().and_then(|p| {
1636 SessionWorkspace::from_directory(
1637 request.metadata.session_id.as_deref().unwrap_or("unknown"),
1638 p.clone(),
1639 )
1640 .ok()
1641 });
1642
1643 if self.pipeline_config.enable_tools
1650 && tools_enabled_for_request
1651 && analysis.is_followup
1652 && Self::looks_like_approval(&request.input)
1653 && let Some(resp) = self
1654 .try_execute_confirmed_tool_from_history(
1655 &request,
1656 &analysis,
1657 &relevant_tools,
1658 workspace.as_ref(),
1659 &tx,
1660 &cancel_token,
1661 )
1662 .await?
1663 {
1664 return Ok(resp);
1665 }
1666
1667 if include_mcp_tool_schemas {
1670 self.ensure_mcp_servers_connected_streaming(&tx, &cancel_token)
1671 .instrument(tracing::info_span!("agent.pipeline.connect_mcp"))
1672 .await;
1673 }
1674
1675 let mut resolved_context = tracing::info_span!(
1677 "agent.pipeline.resolve_context",
1678 phase = "initial"
1679 )
1680 .in_scope(|| {
1681 self.context_manager.resolve_context(
1682 &request.input,
1683 &analysis,
1684 &request.history,
1685 request.metadata.workspace_dir.as_deref(),
1686 )
1687 });
1688
1689 self.enrich_resolved_context(
1691 &mut resolved_context,
1692 &request,
1693 request.metadata.workspace_dir.as_deref(),
1694 &request.input,
1695 &request.metadata,
1696 )
1697 .instrument(tracing::info_span!(
1698 "agent.pipeline.enrich_context",
1699 phase = "initial"
1700 ))
1701 .await;
1702 telemetry
1704 .record_context_resolved("initial", &resolved_context)
1705 .await;
1706
1707 let preview_prompt = tracing::info_span!("agent.pipeline.build_preview_prompt")
1710 .in_scope(|| self.build_prompt(&request, &resolved_context));
1711 if let Some(compaction_chunk) = self
1712 .check_and_apply_auto_compaction(&request.history, &preview_prompt, &request.metadata)
1713 .instrument(tracing::info_span!("agent.pipeline.auto_compaction"))
1714 .await
1715 {
1716 telemetry.record_compaction(&compaction_chunk).await;
1717 let message = self.build_auto_compaction_status_message(&preview_prompt);
1719 let _ = tx.send(StreamChunk::Status { message }).await;
1720
1721 let _ = tx.send(compaction_chunk).await;
1723
1724 resolved_context = tracing::info_span!(
1727 "agent.pipeline.resolve_context",
1728 phase = "post_compaction"
1729 )
1730 .in_scope(|| {
1731 self.context_manager.resolve_context(
1732 &request.input,
1733 &analysis,
1734 &request.history,
1735 request.metadata.workspace_dir.as_deref(),
1736 )
1737 });
1738 self.enrich_resolved_context(
1739 &mut resolved_context,
1740 &request,
1741 request.metadata.workspace_dir.as_deref(),
1742 &request.input,
1743 &request.metadata,
1744 )
1745 .instrument(tracing::info_span!(
1746 "agent.pipeline.enrich_context",
1747 phase = "post_compaction"
1748 ))
1749 .await;
1750 telemetry
1751 .record_context_resolved("post_compaction", &resolved_context)
1752 .await;
1753 }
1754
1755 let (prompt, truncated) = tracing::info_span!("agent.pipeline.prepare_prompt")
1757 .in_scope(|| self.truncate_prompt_if_needed(&request, &mut resolved_context));
1758 telemetry
1759 .record_prompt_prepared(&prompt, truncated, &resolved_context)
1760 .await;
1761
1762 if truncated {
1763 tracing::info!("Prompt was truncated to fit token limit");
1764 }
1765
1766 self.validate_token_limit(&prompt)?;
1769
1770 let token_usage_chunk = self.create_token_usage_update(&prompt);
1772 send_token_usage_chunk_best_effort(&tx, token_usage_chunk).await;
1773
1774 let hook_engine = self.create_hook_engine();
1776 if let Some(ref engine) = hook_engine {
1777 let hook_ctx = HookContext {
1778 workspace_dir: request.metadata.workspace_dir.clone(),
1779 session_id: request.metadata.session_id.clone(),
1780 pipeline_prompt: Some(prompt.clone()),
1781 ..Default::default()
1782 };
1783 if let Err(e) = engine.run(HookEvent::PrePipeline, &hook_ctx).await {
1784 tracing::warn!(error = %e, "PrePipeline hook failed (continuing)");
1785 }
1786 }
1787
1788 let reflection_tx = tx.clone();
1794 let reflection_cancel_token = cancel_token.clone();
1795 let reflection_retry_prompt = prompt.clone();
1796 let reflection_retry_tools = relevant_tools.clone();
1797 let reflection_retry_context = resolved_context.clone();
1798 let effective_max_iterations = self.effective_request_max_iterations(&request);
1799 let reflection_quality_budget = effective_max_iterations.unwrap_or(0);
1800 let relevant_tool_count = relevant_tools.len();
1801 let requirement_detection_input = requirement_detection_input(&request);
1802 let requires_build_and_test = Self::prompt_requires_build_and_test(requirement_detection_input);
1803 let requires_mutating_file_tool_success =
1804 Self::request_requires_mutating_file_tool_success(requirement_detection_input);
1805 if requires_build_and_test {
1806 tracing::warn!(
1807 request_input_preview = %requirement_detection_input.chars().take(160).collect::<String>(),
1808 source = ?request.metadata.source,
1809 session_id = ?request.metadata.session_id,
1810 task_id = ?request.metadata.task_id,
1811 "Agent request seeded requires_build_and_test=true from requirement-detection input"
1812 );
1813 }
1814
1815 let mut response = match self
1818 .execute_agentic_loop_streaming(
1819 prompt.clone(),
1820 requires_build_and_test,
1821 requires_mutating_file_tool_success,
1822 relevant_tools.clone(),
1823 include_mcp_tool_schemas,
1824 resolved_context.clone(),
1825 tx.clone(),
1826 cancel_token.clone(),
1827 workspace.as_ref(),
1828 request.metadata.session_id.clone(),
1829 request.metadata.task_id.clone(),
1830 effective_max_iterations,
1831 request.metadata.permission_level,
1832 &telemetry,
1833 )
1834 .instrument(tracing::info_span!(
1835 "agent.pipeline.execute_agent_loop",
1836 mode = "streaming",
1837 tools = relevant_tool_count,
1838 max_iterations = ?effective_max_iterations
1839 ))
1840 .await
1841 {
1842 Ok(resp) => resp,
1843 Err(AppError::ContextOverflow(ref error_msg)) => {
1844 let provider = self.config.llm.primary.as_str();
1848 let model_id = Self::extract_model_id(&self.config, provider)
1849 .unwrap_or("unknown");
1850
1851 let learned_caps = self.capabilities_cache.learn_from_error(
1852 provider,
1853 model_id,
1854 error_msg,
1855 );
1856
1857 if let Some(ref caps) = learned_caps {
1858 tracing::info!(
1859 provider = provider,
1860 model = model_id,
1861 learned_context_length = caps.context_length,
1862 learned_max_input_tokens = caps.max_input_tokens(),
1863 "Learned model context limit from overflow error; \
1864 will re-budget retry prompt to avoid immediate re-overflow"
1865 );
1866 } else {
1867 tracing::warn!(
1868 provider = provider,
1869 model = model_id,
1870 error_msg = %error_msg,
1871 "Could not parse context limit from overflow error; \
1872 retry prompt will use the configured pipeline limits"
1873 );
1874 }
1875
1876 let retry_max_input_tokens = learned_caps
1886 .as_ref()
1887 .map(|c| c.max_input_tokens())
1888 .unwrap_or_else(|| {
1889 self.pipeline_config
1890 .max_context_tokens
1891 .saturating_sub(self.pipeline_config.max_output_tokens)
1892 });
1893
1894 let _ = tx.send(StreamChunk::Status {
1896 message: "Context overflow detected. Compacting conversation history and retrying..."
1897 .to_string(),
1898 }).await;
1899
1900 let compacted_history = self.force_context_compaction(
1902 &request.history,
1903 &request.metadata,
1904 ).await;
1905
1906 let mut compacted_request = request.clone();
1908 compacted_request.history = compacted_history;
1909
1910 let compacted_analysis = self.analyzer.analyze(&compacted_request.input);
1911 let mut compacted_context = self.context_manager.resolve_context(
1912 &compacted_request.input,
1913 &compacted_analysis,
1914 &compacted_request.history,
1915 compacted_request.metadata.workspace_dir.as_deref(),
1916 );
1917 self.enrich_resolved_context(
1918 &mut compacted_context,
1919 &compacted_request,
1920 compacted_request.metadata.workspace_dir.as_deref(),
1921 &compacted_request.input,
1922 &compacted_request.metadata,
1923 ).await;
1924
1925 let (compacted_prompt, _) = self.truncate_prompt_with_budget(
1928 &compacted_request,
1929 &mut compacted_context,
1930 retry_max_input_tokens,
1931 );
1932
1933 tracing::info!(
1935 retry_max_input_tokens = retry_max_input_tokens,
1936 learned_from_error = learned_caps.is_some(),
1937 original_history_len = request.history.len(),
1938 compacted_history_len = compacted_request.history.len(),
1939 "Retrying agent loop with compacted context and re-budgeted prompt"
1940 );
1941
1942 self.execute_agentic_loop_streaming(
1943 compacted_prompt,
1944 requires_build_and_test,
1945 requires_mutating_file_tool_success,
1946 relevant_tools,
1947 include_mcp_tool_schemas,
1948 compacted_context,
1949 tx,
1950 cancel_token,
1951 workspace.as_ref(),
1952 compacted_request.metadata.session_id.clone(),
1953 compacted_request.metadata.task_id.clone(),
1954 effective_max_iterations,
1955 compacted_request.metadata.permission_level,
1956 &telemetry,
1957 )
1958 .instrument(tracing::info_span!(
1959 "agent.pipeline.execute_agent_loop",
1960 mode = "streaming_retry_after_compaction",
1961 tools = relevant_tool_count,
1962 ))
1963 .await?
1964 }
1965 Err(e) => return Err(e),
1966 };
1967
1968 response.truncated = truncated;
1969
1970 if reflection_cancel_token.is_cancelled() {
1971 telemetry.mark_outcome(RequestOutcome::Cancelled);
1972 return Ok(response);
1973 }
1974
1975 if let Some(mut generated_reflection) = self
1976 .maybe_generate_reflection(
1977 &request.input,
1978 &response,
1979 &request.metadata,
1980 Some(&reflection_tx),
1981 &reflection_cancel_token,
1982 reflection_quality_budget,
1983 )
1984 .instrument(tracing::info_span!("agent.pipeline.reflection", mode = "streaming"))
1985 .await
1986 {
1987 telemetry
1988 .record_reflection_generated(
1989 generated_reflection.initial_quality_score,
1990 generated_reflection.reflection.promotion_confidence(),
1991 )
1992 .await;
1993 let retry = if self.should_attempt_reflection_reexecution(
1994 &response,
1995 &generated_reflection.reflection,
1996 !reflection_retry_tools.is_empty(),
1997 ) {
1998 let _ = reflection_tx
1999 .send(StreamChunk::Status {
2000 message: "Low-confidence answer detected; running one safe reflection-guided retry with sandboxed tool access...".to_string(),
2001 })
2002 .await;
2003 self.maybe_run_reflection_reexecution(
2004 &response,
2005 &generated_reflection.reflection,
2006 generated_reflection.initial_quality_score,
2007 reflection::ReflectionReexecutionContext {
2008 base_prompt: &reflection_retry_prompt,
2009 tools: reflection_retry_tools.clone(),
2010 context: reflection_retry_context.clone(),
2011 session_id: request.metadata.session_id.as_deref(),
2012 workspace: workspace.as_ref(),
2013 },
2014 )
2015 .await
2016 } else {
2017 self.maybe_run_reflection_retry(
2018 &request.input,
2019 &response,
2020 &generated_reflection.reflection,
2021 generated_reflection.initial_quality_score,
2022 Some(&reflection_tx),
2023 )
2024 .await
2025 };
2026
2027 if let Some(retry) = retry.as_ref() {
2028 telemetry
2029 .record_reflection_retry(
2030 retry.improved,
2031 retry.improvement_score,
2032 retry.iterations,
2033 )
2034 .await;
2035 if let Some(usage) = retry.usage.as_ref() {
2036 merge_token_usage(&mut response.usage, usage);
2037 }
2038 generated_reflection.reflection.improvement_score = Some(retry.improvement_score);
2039 response.tool_calls.extend(retry.tool_calls.clone());
2040 response.iterations += retry.iterations;
2041
2042 if retry.improved {
2043 let status_message = match retry.mode {
2044 reflection::ReflectionRetryMode::TextRevision => format!(
2045 "Reflection-guided revision improved quality from {:.0}% to {:.0}%",
2046 generated_reflection.initial_quality_score * 100.0,
2047 retry.retry_quality_score * 100.0,
2048 ),
2049 reflection::ReflectionRetryMode::CorrectiveReexecution => format!(
2050 "Reflection-guided safe retry improved quality from {:.0}% to {:.0}% using {} tool calls across {} iterations",
2051 generated_reflection.initial_quality_score * 100.0,
2052 retry.retry_quality_score * 100.0,
2053 retry.tool_calls.len(),
2054 retry.iterations,
2055 ),
2056 };
2057 let _ = reflection_tx
2058 .send(StreamChunk::Status {
2059 message: status_message,
2060 })
2061 .await;
2062
2063 let revised_block = format!(
2064 "{}{}",
2065 reflection::REFLECTION_RETRY_SEPARATOR,
2066 retry.revised_content
2067 );
2068 response.content.push_str(&revised_block);
2069 self.emit_reflection_retry_text(&reflection_tx, &revised_block)
2070 .await;
2071 } else {
2072 let status_message = match retry.mode {
2073 reflection::ReflectionRetryMode::TextRevision => format!(
2074 "Reflection-guided revision did not materially improve quality ({:.0}% → {:.0}%)",
2075 generated_reflection.initial_quality_score * 100.0,
2076 retry.retry_quality_score * 100.0,
2077 ),
2078 reflection::ReflectionRetryMode::CorrectiveReexecution => format!(
2079 "Reflection-guided safe retry did not materially improve quality ({:.0}% → {:.0}%, {} tool calls)",
2080 generated_reflection.initial_quality_score * 100.0,
2081 retry.retry_quality_score * 100.0,
2082 retry.tool_calls.len(),
2083 ),
2084 };
2085 let _ = reflection_tx
2086 .send(StreamChunk::Status {
2087 message: status_message,
2088 })
2089 .await;
2090 }
2091 }
2092
2093 self.finalize_reflection(
2094 &generated_reflection.reflection,
2095 &request.metadata,
2096 workspace.as_ref(),
2097 Some(&reflection_tx),
2098 retry.as_ref(),
2099 )
2100 .await;
2101 }
2102
2103 if let Some(ref engine) = hook_engine {
2105 let hook_ctx = HookContext {
2106 workspace_dir: request.metadata.workspace_dir.clone(),
2107 session_id: request.metadata.session_id.clone(),
2108 ..Default::default()
2109 };
2110 self.run_hook_best_effort(engine, HookEvent::PostPipeline, &hook_ctx)
2111 .await;
2112 }
2113
2114 if !reflection_cancel_token.is_cancelled() {
2115 let _ = reflection_tx
2116 .send(StreamChunk::Done(response.usage.clone()))
2117 .await;
2118 }
2119
2120 Ok(response)
2121 })
2122 .await;
2123
2124 match &result {
2125 Ok(response) => telemetry.finish(Some(response), None).await,
2126 Err(error) => {
2127 telemetry.mark_outcome(RequestOutcome::Failed);
2128 telemetry.finish(None, Some(error)).await;
2129 }
2130 }
2131
2132 result
2133 }
2134
2135 fn looks_like_approval(input: &str) -> bool {
2136 let s = input.trim().to_lowercase();
2137 matches!(
2138 s.as_str(),
2139 "ok" | "okay"
2140 | "ok."
2141 | "okay."
2142 | "yes"
2143 | "y"
2144 | "sure"
2145 | "please proceed"
2146 | "proceed"
2147 | "go ahead"
2148 | "do it"
2149 | "run it"
2150 | "continue"
2151 ) || s.contains("please proceed")
2152 || s.contains("go ahead")
2153 || s.contains("please do")
2154 || s.contains("yes, proceed")
2155 }
2156
2157 async fn try_execute_confirmed_tool_from_history(
2167 &self,
2168 request: &AgentRequest,
2169 _analysis: &crate::context::RequestAnalysis,
2170 relevant_tools: &[&'static ToolDefinition],
2171 workspace: Option<&SessionWorkspace>,
2172 tx: &mpsc::Sender<StreamChunk>,
2173 cancel_token: &CancellationToken,
2174 ) -> Result<Option<AgentResponse>, AppError> {
2175 let has_tool = |name: &str| relevant_tools.iter().any(|t| t.name == name);
2176
2177 let Some(prev_assistant) = request.history.iter().rev().find(|m| m.role == "assistant")
2178 else {
2179 return Ok(None);
2180 };
2181
2182 let Some((tool_name, args, _answer_prefix)) =
2183 Self::extract_planned_tool_call_from_text(&prev_assistant.content)
2184 else {
2185 return Ok(None);
2186 };
2187
2188 if !has_tool(&tool_name) {
2190 return Ok(None);
2191 }
2192
2193 let tool_call_id = format!("confirmed_{tool_name}");
2195
2196 let _ = tx
2197 .send(StreamChunk::Thinking(
2198 "Executing approved command...\n".to_string(),
2199 ))
2200 .await;
2201 let _ = tx
2202 .send(StreamChunk::ToolCallStart {
2203 id: tool_call_id.clone(),
2204 name: tool_name.clone(),
2205 })
2206 .await;
2207 let _ = tx.send(StreamChunk::ToolCallArgs(args.clone())).await;
2208
2209 let start_time = Instant::now();
2210 let result = self
2211 .execute_tool(
2212 &tool_name,
2213 &args,
2214 workspace,
2215 request.metadata.session_id.as_deref(),
2216 Some(tx),
2217 )
2218 .await;
2219 let duration_ms = start_time.elapsed().as_millis() as u64;
2220
2221 let _ = tx.send(StreamChunk::ToolCallEnd).await;
2222
2223 let (success, output) = match &result {
2225 ToolResult::Success(out) => (true, out.trim_end().to_string()),
2226 ToolResult::Error(e) => (false, e.clone()),
2227 ToolResult::Skipped(msg) => (false, format!("Skipped: {}", msg)),
2228 };
2229 let _ = tx
2230 .send(StreamChunk::ToolCallResult {
2231 name: tool_name.clone(),
2232 success,
2233 output: output.clone(),
2234 duration_ms,
2235 })
2236 .await;
2237
2238 let record = ToolCallRecord {
2239 id: tool_call_id,
2240 name: tool_name,
2241 arguments: args,
2242 result,
2243 duration_ms,
2244 };
2245
2246 let base_prompt = self.build_prompt(request, &crate::context::ResolvedContext::default());
2250 let continuation_prompt = self.build_tool_continuation_prompt(
2251 &base_prompt,
2252 "Executing the approved tool call.",
2253 std::slice::from_ref(&record),
2254 );
2255
2256 let (inner_tx, mut inner_rx) = mpsc::channel::<StreamChunk>(STREAM_CHUNK_BUFFER_CAPACITY);
2258 let streaming_cfg = crate::streaming::streaming_config_from(&self.config);
2259 let enable_fallback = self.pipeline_config.enable_fallback;
2260 let inner_cancel = cancel_token.clone();
2261
2262 let stream_handle = tokio::spawn(
2263 async move {
2264 if enable_fallback {
2265 let _ = start_streaming_with_fallback(
2266 &streaming_cfg,
2267 &continuation_prompt,
2268 None,
2269 inner_tx,
2270 inner_cancel,
2271 )
2272 .await;
2273 } else {
2274 let _ = start_streaming(
2275 &streaming_cfg,
2276 &continuation_prompt,
2277 None,
2278 inner_tx,
2279 inner_cancel,
2280 )
2281 .await;
2282 }
2283 }
2284 .instrument(tracing::Span::current()),
2285 );
2286
2287 let mut synthesis_text = String::new();
2288 let mut synthesis_usage = None;
2289
2290 while let Some(chunk) = inner_rx.recv().await {
2291 match &chunk {
2292 StreamChunk::Text(text) => {
2293 synthesis_text.push_str(text);
2294 let _ = tx.send(chunk).await;
2295 }
2296 StreamChunk::Thinking(_) => {
2297 let _ = tx.send(chunk).await;
2298 }
2299 StreamChunk::Status { .. } => {
2300 send_status_chunk_best_effort(tx, chunk).await;
2301 }
2302 StreamChunk::TokenUsageUpdate { .. } => {
2303 send_token_usage_chunk_best_effort(tx, chunk).await;
2304 }
2305 StreamChunk::Done(usage) => {
2306 synthesis_usage = usage.clone();
2307 break;
2308 }
2309 StreamChunk::Error(_) | StreamChunk::Cancelled | StreamChunk::Paused => {
2310 let _ = tx.send(chunk).await;
2311 break;
2312 }
2313 _ => {
2314 let _ = tx.send(chunk).await;
2316 }
2317 }
2318 }
2319
2320 let _ = stream_handle.await;
2321 let _ = tx.send(StreamChunk::Done(synthesis_usage.clone())).await;
2322
2323 Ok(Some(AgentResponse {
2324 content: synthesis_text,
2325 thinking: None,
2326 tool_calls: vec![record],
2327 usage: synthesis_usage,
2328 context_used: crate::context::ResolvedContext::default(),
2329 truncated: false,
2330 iterations: 1,
2331 }))
2332 }
2333
2334 fn extract_shell_command_from_plan(text: &str) -> Option<String> {
2335 if let Some(cmd) = Self::extract_quoted_after_keyword(text, "run") {
2337 return Some(cmd);
2338 }
2339 if let Some(cmd) = Self::extract_quoted_after_keyword(text, "execute") {
2340 return Some(cmd);
2341 }
2342
2343 let lower = text.to_lowercase();
2345 let idx = lower.find("run ")?;
2346 let after = text[idx + 4..].trim_start();
2347 let token: String = after
2348 .chars()
2349 .take_while(|c| !c.is_whitespace() && !matches!(*c, '.' | ',' | ';' | ')' | '('))
2350 .collect();
2351 if token.is_empty() { None } else { Some(token) }
2352 }
2353
2354 fn extract_quoted_after_keyword(text: &str, keyword: &str) -> Option<String> {
2355 let lower = text.to_lowercase();
2356 let key = format!("{} ", keyword.to_lowercase());
2357 let start = lower.find(&key)?;
2358 let mut rest = text[start + key.len()..].trim_start();
2359 let quote = rest.chars().next()?;
2360 if quote != '\'' && quote != '"' && quote != '`' {
2361 return None;
2362 }
2363 rest = &rest[quote.len_utf8()..];
2364 let end = rest.find(quote)?;
2365 let cmd = rest[..end].trim().to_string();
2366 if cmd.is_empty() { None } else { Some(cmd) }
2367 }
2368
2369 fn extract_first_quoted(text: &str) -> Option<String> {
2370 for quote in ['\'', '"', '`'] {
2371 if let Some(start) = text.find(quote) {
2372 let rest = &text[start + quote.len_utf8()..];
2373 if let Some(end) = rest.find(quote) {
2374 let s = rest[..end].trim();
2375 if !s.is_empty() {
2376 return Some(s.to_string());
2377 }
2378 }
2379 }
2380 }
2381 None
2382 }
2383
2384 fn extract_first_url(text: &str) -> Option<String> {
2385 let lower = text.to_lowercase();
2386 let idx = lower.find("https://").or_else(|| lower.find("http://"))?;
2387 let after = &text[idx..];
2388 let url: String = after
2389 .chars()
2390 .take_while(|c| {
2391 !c.is_whitespace() && !matches!(*c, ')' | '(' | ']' | '[' | '"' | '\'' | '`' | ',')
2392 })
2393 .collect();
2394 if url.is_empty() { None } else { Some(url) }
2395 }
2396
2397 fn extract_planned_tool_call_from_text(text: &str) -> Option<(String, String, String)> {
2401 let lower = text.to_lowercase();
2402
2403 if lower.contains("shell tool")
2405 || lower.contains("tool: shell")
2406 || lower.contains("use the shell")
2407 || lower.contains("run 'pwd'")
2408 || lower.contains("`pwd`")
2409 {
2410 let command = Self::extract_shell_command_from_plan(text)?;
2411 let args = serde_json::json!({"command": command}).to_string();
2412 return Some((
2413 "shell".to_string(),
2414 args,
2415 "Workspace directory root: ".to_string(),
2416 ));
2417 }
2418
2419 if lower.contains("file tool")
2421 || lower.contains("read file")
2422 || lower.contains("read the file")
2423 {
2424 let path = Self::extract_quoted_after_keyword(text, "read")
2425 .or_else(|| Self::extract_first_quoted(text))?;
2426 let args = serde_json::json!({"operation": "read", "path": path}).to_string();
2427 return Some(("file".to_string(), args, "File contents: \n".to_string()));
2428 }
2429
2430 if lower.contains("git tool") || lower.contains("git status") {
2432 let args = serde_json::json!({"operation": "status"}).to_string();
2433 return Some(("git".to_string(), args, "Git status:\n".to_string()));
2434 }
2435
2436 if lower.contains("web tool")
2438 || lower.contains("search the web")
2439 || lower.contains("web_search")
2440 {
2441 if lower.contains("fetch") || lower.contains("download") {
2442 let url = Self::extract_first_url(text)?;
2443 let args = serde_json::json!({"operation": "fetch", "url": url}).to_string();
2444 return Some(("web".to_string(), args, "Web fetch result:\n".to_string()));
2445 }
2446
2447 let query = Self::extract_quoted_after_keyword(text, "search")
2448 .or_else(|| Self::extract_first_quoted(text))?;
2449 let args = serde_json::json!({"operation": "search", "query": query}).to_string();
2450 return Some(("web".to_string(), args, "Web search results:\n".to_string()));
2451 }
2452
2453 if lower.contains("code tool") || lower.contains("code stats") {
2455 let path = Self::extract_first_quoted(text).unwrap_or_else(|| ".".to_string());
2456 let args = serde_json::json!({"operation": "stats", "path": path}).to_string();
2457 return Some(("code".to_string(), args, "Code stats:\n".to_string()));
2458 }
2459
2460 None
2461 }
2462
2463 pub async fn process_blocking(&self, request: AgentRequest) -> Result<AgentResponse, AppError> {
2465 let mut request = request;
2469 if request.metadata.workspace_dir.is_none()
2470 && let Ok(cwd) = std::env::current_dir()
2471 {
2472 tracing::debug!(
2473 cwd = %cwd.display(),
2474 "workspace_dir not set; defaulting to CWD (blocking path)"
2475 );
2476 request.metadata.workspace_dir = Some(cwd);
2477 }
2478
2479 let telemetry = AgentRequestTelemetry::start(
2480 &request,
2481 RequestRunMode::Blocking,
2482 self.config.pipeline.agent_telemetry.enabled,
2483 )
2484 .await;
2485 let result = telemetry.in_request_scope(async {
2486
2487 let mut analysis = tracing::info_span!("agent.pipeline.analyze_request").in_scope(|| {
2489 self.analyzer.analyze(&request.input)
2490 });
2491 telemetry.record_analysis(&analysis).await;
2492 self.maybe_apply_advanced_primitives_middleware(&mut request, &analysis).await;
2493 Self::maybe_attach_normalized_intent(&mut request);
2494
2495 if let Some(router) = &self.tool_router
2497 && analysis.needs_tools
2498 {
2499 let all: Vec<&'static ToolDefinition> = all_tools().iter().collect();
2500 let routing = router
2501 .route(&request.input, &all, analysis.confidence)
2502 .instrument(tracing::info_span!("agent.pipeline.route_tools"))
2503 .await;
2504 if routing.has_selection() {
2505 tracing::debug!(
2506 tools = ?routing.suggested_tools,
2507 "Pre-flight LLM router selected tools (blocking)"
2508 );
2509 analysis.suggested_tools = routing.suggested_tools;
2510 }
2511 }
2512
2513 let tools_enabled_for_request = request.metadata.tools_enabled.unwrap_or(true);
2515
2516 let relevant_tools = if self.pipeline_config.enable_tools
2517 && tools_enabled_for_request
2518 && analysis.needs_tools
2519 {
2520 self.get_tools_for_analysis(&analysis, &request.metadata.allowed_tools)
2521 } else {
2522 Vec::new()
2523 };
2524
2525 let include_mcp_tool_schemas = self.pipeline_config.enable_tools
2526 && tools_enabled_for_request
2527 && analysis.needs_tools
2528 && (relevant_tools.iter().any(|t| t.name == "mcp")
2529 || request
2530 .metadata
2531 .allowed_tools
2532 .iter()
2533 .any(|t| t == "mcp" || t.starts_with("mcp__")));
2534 self.maybe_initialize_tracked_request_task(
2535 &mut request,
2536 analysis.needs_tools,
2537 relevant_tools.iter().any(|tool| tool.name == "task"),
2538 )
2539 .await;
2540 telemetry
2541 .record_tool_selection(
2542 relevant_tools.len(),
2543 include_mcp_tool_schemas,
2544 analysis.needs_tools && relevant_tools.is_empty(),
2545 )
2546 .await;
2547
2548 if include_mcp_tool_schemas {
2550 self.ensure_mcp_servers_connected()
2551 .instrument(tracing::info_span!("agent.pipeline.connect_mcp"))
2552 .await;
2553 }
2554
2555 let mut resolved_context = tracing::info_span!(
2557 "agent.pipeline.resolve_context",
2558 phase = "initial"
2559 )
2560 .in_scope(|| {
2561 self.context_manager.resolve_context(
2562 &request.input,
2563 &analysis,
2564 &request.history,
2565 request.metadata.workspace_dir.as_deref(),
2566 )
2567 });
2568
2569 self.enrich_resolved_context(
2571 &mut resolved_context,
2572 &request,
2573 request.metadata.workspace_dir.as_deref(),
2574 &request.input,
2575 &request.metadata,
2576 )
2577 .instrument(tracing::info_span!(
2578 "agent.pipeline.enrich_context",
2579 phase = "initial"
2580 ))
2581 .await;
2582 telemetry
2583 .record_context_resolved("initial", &resolved_context)
2584 .await;
2585
2586 let preview_prompt = tracing::info_span!("agent.pipeline.build_preview_prompt")
2589 .in_scope(|| self.build_prompt(&request, &resolved_context));
2590 if let Some(compaction_chunk) = self
2591 .check_and_apply_auto_compaction(&request.history, &preview_prompt, &request.metadata)
2592 .instrument(tracing::info_span!("agent.pipeline.auto_compaction"))
2593 .await
2594 {
2595 telemetry.record_compaction(&compaction_chunk).await;
2596 match compaction_chunk {
2598 StreamChunk::ContextCompacted {
2599 messages_before,
2600 messages_after,
2601 tokens_saved,
2602 summary,
2603 } => {
2604 tracing::info!(
2605 messages_before = messages_before,
2606 messages_after = messages_after,
2607 tokens_saved = tokens_saved,
2608 "Context auto-compacted in blocking mode: {}",
2609 summary
2610 );
2611 }
2612 StreamChunk::MemoryBankSaved {
2613 file_path,
2614 session_id,
2615 summary,
2616 messages_saved,
2617 } => {
2618 tracing::info!(
2619 file_path = %file_path,
2620 session_id = %session_id,
2621 messages_saved = messages_saved,
2622 "Memory bank saved in blocking mode: {}",
2623 summary
2624 );
2625 }
2626 _ => {}
2627 }
2628
2629 resolved_context = tracing::info_span!(
2632 "agent.pipeline.resolve_context",
2633 phase = "post_compaction"
2634 )
2635 .in_scope(|| {
2636 self.context_manager.resolve_context(
2637 &request.input,
2638 &analysis,
2639 &request.history,
2640 request.metadata.workspace_dir.as_deref(),
2641 )
2642 });
2643 self.enrich_resolved_context(
2644 &mut resolved_context,
2645 &request,
2646 request.metadata.workspace_dir.as_deref(),
2647 &request.input,
2648 &request.metadata,
2649 )
2650 .instrument(tracing::info_span!(
2651 "agent.pipeline.enrich_context",
2652 phase = "post_compaction"
2653 ))
2654 .await;
2655 telemetry
2656 .record_context_resolved("post_compaction", &resolved_context)
2657 .await;
2658 }
2659
2660 let (prompt, truncated) = tracing::info_span!("agent.pipeline.prepare_prompt")
2662 .in_scope(|| self.truncate_prompt_if_needed(&request, &mut resolved_context));
2663 telemetry
2664 .record_prompt_prepared(&prompt, truncated, &resolved_context)
2665 .await;
2666
2667 if truncated {
2668 tracing::info!("Prompt was truncated to fit token limit");
2669 }
2670
2671 self.validate_token_limit(&prompt)?;
2674
2675 if let StreamChunk::TokenUsageUpdate {
2677 estimated,
2678 limit,
2679 percentage,
2680 status,
2681 estimated_cost,
2682 } = self.create_token_usage_update(&prompt)
2683 {
2684 let status_str = match status {
2685 crate::streaming::TokenUsageStatus::Green => "🟢 Green",
2686 crate::streaming::TokenUsageStatus::Yellow => "🟡 Yellow",
2687 crate::streaming::TokenUsageStatus::Red => "🔴 Red",
2688 };
2689 tracing::info!(
2690 estimated_tokens = estimated,
2691 limit = limit,
2692 percentage = percentage,
2693 status = status_str,
2694 estimated_cost_usd = format!("${:.4}", estimated_cost),
2695 "Token usage in blocking mode: {} tokens / {} tokens ({}%) - Est. cost: ${:.4}",
2696 estimated,
2697 limit,
2698 percentage,
2699 estimated_cost
2700 );
2701 }
2702
2703 let hook_engine = self.create_hook_engine();
2705 if let Some(ref engine) = hook_engine {
2706 let hook_ctx = HookContext {
2707 workspace_dir: request.metadata.workspace_dir.clone(),
2708 session_id: request.metadata.session_id.clone(),
2709 pipeline_prompt: Some(prompt.clone()),
2710 ..Default::default()
2711 };
2712 if let Err(e) = engine.run(HookEvent::PrePipeline, &hook_ctx).await {
2713 tracing::warn!(error = %e, "PrePipeline hook failed in blocking mode (continuing)");
2714 }
2715 }
2716
2717 let workspace = request.metadata.workspace_dir.as_ref().and_then(|p| {
2719 SessionWorkspace::from_directory(
2720 request.metadata.session_id.as_deref().unwrap_or("unknown"),
2721 p.clone(),
2722 )
2723 .ok()
2724 });
2725 let reflection_retry_prompt = prompt.clone();
2726 let reflection_retry_tools = relevant_tools.clone();
2727 let reflection_retry_context = resolved_context.clone();
2728 let effective_max_iterations = self.effective_request_max_iterations(&request);
2729 let reflection_quality_budget = effective_max_iterations.unwrap_or(0);
2730 let relevant_tool_count = relevant_tools.len();
2731 let requirement_detection_input = requirement_detection_input(&request);
2732 let requires_build_and_test = Self::prompt_requires_build_and_test(requirement_detection_input);
2733 let requires_mutating_file_tool_success =
2734 Self::request_requires_mutating_file_tool_success(requirement_detection_input);
2735 if requires_build_and_test {
2736 tracing::warn!(
2737 request_input_preview = %requirement_detection_input.chars().take(160).collect::<String>(),
2738 source = ?request.metadata.source,
2739 session_id = ?request.metadata.session_id,
2740 task_id = ?request.metadata.task_id,
2741 "Agent request seeded requires_build_and_test=true from requirement-detection input"
2742 );
2743 }
2744
2745 let mut response = self
2746 .execute_agentic_loop_blocking(
2747 prompt,
2748 requires_build_and_test,
2749 requires_mutating_file_tool_success,
2750 relevant_tools,
2751 include_mcp_tool_schemas,
2752 resolved_context,
2753 workspace.as_ref(),
2754 request.metadata.session_id.clone(),
2755 request.metadata.task_id.clone(),
2756 effective_max_iterations,
2757 &telemetry,
2758 )
2759 .instrument(tracing::info_span!(
2760 "agent.pipeline.execute_agent_loop",
2761 mode = "blocking",
2762 tools = relevant_tool_count,
2763 max_iterations = ?effective_max_iterations
2764 ))
2765 .await?;
2766
2767 response.truncated = truncated;
2768
2769 if let Some(mut generated_reflection) = self
2770 .maybe_generate_reflection(
2771 &request.input,
2772 &response,
2773 &request.metadata,
2774 None,
2775 &crate::streaming::CancellationToken::new(),
2776 reflection_quality_budget,
2777 )
2778 .instrument(tracing::info_span!("agent.pipeline.reflection", mode = "blocking"))
2779 .await
2780 {
2781 telemetry
2782 .record_reflection_generated(
2783 generated_reflection.initial_quality_score,
2784 generated_reflection.reflection.promotion_confidence(),
2785 )
2786 .await;
2787 let retry = if self.should_attempt_reflection_reexecution(
2788 &response,
2789 &generated_reflection.reflection,
2790 !reflection_retry_tools.is_empty(),
2791 ) {
2792 self.maybe_run_reflection_reexecution(
2793 &response,
2794 &generated_reflection.reflection,
2795 generated_reflection.initial_quality_score,
2796 reflection::ReflectionReexecutionContext {
2797 base_prompt: &reflection_retry_prompt,
2798 tools: reflection_retry_tools,
2799 context: reflection_retry_context,
2800 session_id: request.metadata.session_id.as_deref(),
2801 workspace: workspace.as_ref(),
2802 },
2803 )
2804 .await
2805 } else {
2806 self.maybe_run_reflection_retry(
2807 &request.input,
2808 &response,
2809 &generated_reflection.reflection,
2810 generated_reflection.initial_quality_score,
2811 None,
2812 )
2813 .await
2814 };
2815
2816 if let Some(retry) = retry.as_ref() {
2817 telemetry
2818 .record_reflection_retry(
2819 retry.improved,
2820 retry.improvement_score,
2821 retry.iterations,
2822 )
2823 .await;
2824 if let Some(usage) = retry.usage.as_ref() {
2825 merge_token_usage(&mut response.usage, usage);
2826 }
2827 generated_reflection.reflection.improvement_score = Some(retry.improvement_score);
2828 response.tool_calls.extend(retry.tool_calls.clone());
2829 response.iterations += retry.iterations;
2830
2831 if retry.improved {
2832 response.content = retry.revised_content.clone();
2833 response.thinking = None;
2834 }
2835 }
2836
2837 self.finalize_reflection(
2838 &generated_reflection.reflection,
2839 &request.metadata,
2840 workspace.as_ref(),
2841 None,
2842 retry.as_ref(),
2843 )
2844 .await;
2845 }
2846
2847 if let Some(ref engine) = hook_engine {
2849 let hook_ctx = HookContext {
2850 workspace_dir: request.metadata.workspace_dir.clone(),
2851 session_id: request.metadata.session_id.clone(),
2852 ..Default::default()
2853 };
2854 self.run_hook_best_effort(engine, HookEvent::PostPipeline, &hook_ctx)
2855 .await;
2856 }
2857
2858 Ok(response)
2859 })
2860 .await;
2861
2862 match &result {
2863 Ok(response) => telemetry.finish(Some(response), None).await,
2864 Err(error) => {
2865 telemetry.mark_outcome(RequestOutcome::Failed);
2866 telemetry.finish(None, Some(error)).await;
2867 }
2868 }
2869
2870 result
2871 }
2872
2873 fn get_tools_for_analysis(
2877 &self,
2878 analysis: &crate::context::RequestAnalysis,
2879 allowed_tools: &[String],
2880 ) -> Vec<&'static ToolDefinition> {
2881 use crate::context::ContextCategory;
2882
2883 let candidate_tools: Vec<_> = if allowed_tools.is_empty() {
2884 all_tools().iter().collect()
2885 } else {
2886 allowed_tools
2887 .iter()
2888 .filter_map(|tool_name| crate::tools::registry::find_tool(tool_name))
2889 .collect()
2890 };
2891 let candidate_names: HashSet<_> = candidate_tools.iter().map(|tool| tool.name).collect();
2892 let mut tools = Vec::new();
2893
2894 if !allowed_tools.is_empty() && self.pipeline_config.log_token_usage {
2895 tracing::debug!(
2896 allowed_tools = ?allowed_tools,
2897 candidate_tools = ?candidate_tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2898 "Using session-specific tool configuration as candidate pool"
2899 );
2900 }
2901
2902 if !analysis.suggested_tools.is_empty() {
2906 let mut resolved: Vec<_> = analysis
2907 .suggested_tools
2908 .iter()
2909 .filter(|name| candidate_names.contains(name.as_str()))
2910 .filter_map(|name| crate::tools::registry::find_tool(name))
2911 .collect();
2912
2913 Self::append_task_tool_for_auto_tracked_request(
2914 analysis,
2915 &candidate_names,
2916 &mut resolved,
2917 );
2918
2919 if !resolved.is_empty() {
2920 resolved.sort_by_key(|tool| tool.name);
2921 resolved.dedup_by_key(|tool| tool.name);
2922 if self.pipeline_config.log_token_usage {
2923 tracing::debug!(
2924 suggested = ?analysis.suggested_tools,
2925 resolved_tools = ?resolved.iter().map(|t| t.name).collect::<Vec<_>>(),
2926 "Using LLM router tool selection"
2927 );
2928 }
2929 return resolved;
2930 }
2931 }
2932
2933 let mut push_if_allowed = |tool_name: &str| {
2934 if candidate_names.contains(tool_name)
2935 && let Some(tool) = crate::tools::registry::find_tool(tool_name)
2936 {
2937 tools.push(tool);
2938 }
2939 };
2940
2941 for category in &analysis.categories {
2943 match category {
2944 ContextCategory::FileSystem => push_if_allowed("file"),
2945 ContextCategory::Shell => push_if_allowed("shell"),
2946 ContextCategory::Git => push_if_allowed("git"),
2947 ContextCategory::Code => {
2948 for tool_name in crate::tools::registry::code_tool_names() {
2949 push_if_allowed(tool_name);
2950 }
2951 }
2952 ContextCategory::Web => {
2953 push_if_allowed("web");
2954 push_if_allowed("web_search");
2956 }
2957 ContextCategory::Screen => {
2958 push_if_allowed("screenshot");
2959 push_if_allowed("screen_record");
2960 }
2961 ContextCategory::Agent => push_if_allowed("a2a"),
2962 ContextCategory::Mcp => push_if_allowed("mcp"),
2963 ContextCategory::A2a => push_if_allowed("a2a"),
2964 ContextCategory::Task => push_if_allowed("task"),
2965 ContextCategory::Tools => push_if_allowed("permissions"),
2966 ContextCategory::Voice
2967 | ContextCategory::Config
2968 | ContextCategory::Session
2969 | ContextCategory::General => {}
2970 }
2971 }
2972
2973 if analysis.needs_tools && (tools.is_empty() || analysis.confidence < 0.2) {
2979 tools = candidate_tools;
2980
2981 if self.pipeline_config.log_token_usage {
2982 tracing::debug!(
2983 confidence = analysis.confidence,
2984 candidate_tools = ?tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2985 "Using candidate-pool fallback (no category match or confidence too low)"
2986 );
2987 }
2988 }
2989
2990 Self::append_task_tool_for_auto_tracked_request(analysis, &candidate_names, &mut tools);
2991
2992 if self.pipeline_config.log_token_usage {
2993 tracing::debug!(
2994 categories = ?analysis.categories,
2995 needs_tools = analysis.needs_tools,
2996 resolved_tools = ?tools.iter().map(|t| t.name).collect::<Vec<_>>(),
2997 "Category-based tool filtering"
2998 );
2999 }
3000
3001 tools.sort_by_key(|t| t.name);
3003 tools.dedup_by_key(|t| t.name);
3004
3005 tools
3006 }
3007
3008 fn promote_approval_to_tool_followup(
3016 &self,
3017 request: &AgentRequest,
3018 analysis: &mut crate::context::RequestAnalysis,
3019 ) {
3020 use crate::context::ContextCategory;
3021
3022 let input = request.input.trim().to_lowercase();
3023 let looks_like_approval = matches!(
3024 input.as_str(),
3025 "ok" | "okay"
3026 | "ok."
3027 | "okay."
3028 | "yes"
3029 | "y"
3030 | "sure"
3031 | "please proceed"
3032 | "proceed"
3033 | "go ahead"
3034 | "do it"
3035 | "run it"
3036 | "continue"
3037 ) || input.contains("please proceed")
3038 || input.contains("go ahead")
3039 || input.contains("please do")
3040 || input.contains("yes, proceed");
3041
3042 if !looks_like_approval {
3043 return;
3044 }
3045
3046 if analysis.needs_tools {
3048 return;
3049 }
3050
3051 let Some(prev_assistant) = request.history.iter().rev().find(|m| m.role == "assistant")
3053 else {
3054 return;
3055 };
3056
3057 let prev = prev_assistant.content.to_lowercase();
3058
3059 let tool_name = if prev.contains("shell tool")
3061 || (prev.contains("run") && prev.contains("pwd"))
3062 || prev.contains("`pwd`")
3063 {
3064 Some("shell")
3065 } else if prev.contains("git tool") || prev.contains("git status") {
3066 Some("git")
3067 } else if prev.contains("file tool") || prev.contains("read file") {
3068 Some("file")
3069 } else if prev.contains("web tool") || prev.contains("search the web") {
3070 Some("web")
3071 } else if prev.contains("code tool") || prev.contains("code stats") {
3072 Some("code")
3073 } else {
3074 None
3075 };
3076
3077 let Some(tool_name) = tool_name else {
3078 return;
3079 };
3080
3081 analysis.needs_tools = true;
3082 analysis.is_followup = true;
3083 analysis.confidence = analysis.confidence.max(0.85);
3084 analysis.suggested_tools.push(tool_name.to_string());
3085
3086 match tool_name {
3088 "shell" => {
3089 analysis.categories.insert(ContextCategory::Shell);
3090 analysis.categories.insert(ContextCategory::Tools);
3091 }
3092 "git" => {
3093 analysis.categories.insert(ContextCategory::Git);
3094 analysis.categories.insert(ContextCategory::Tools);
3095 }
3096 "file" => {
3097 analysis.categories.insert(ContextCategory::FileSystem);
3098 analysis.categories.insert(ContextCategory::Tools);
3099 }
3100 "web" => {
3101 analysis.categories.insert(ContextCategory::Web);
3102 analysis.categories.insert(ContextCategory::Tools);
3103 }
3104 "code" => {
3105 analysis.categories.insert(ContextCategory::Code);
3106 analysis.categories.insert(ContextCategory::Tools);
3107 }
3108 _ => {
3109 analysis.categories.insert(ContextCategory::Tools);
3110 }
3111 }
3112 }
3113
3114 async fn enrich_resolved_context(
3120 &self,
3121 resolved_context: &mut crate::context::ResolvedContext,
3122 request: &AgentRequest,
3123 workspace_dir: Option<&std::path::Path>,
3124 query: &str,
3125 metadata: &RequestMetadata,
3126 ) {
3127 if let Some(short_term_sections) =
3129 self.load_session_working_memory(metadata.session_id.as_deref(), query, 4)
3130 && !short_term_sections.is_empty()
3131 {
3132 tracing::debug!(
3133 memory_sections = short_term_sections.len(),
3134 "Added session working memory to request"
3135 );
3136 resolved_context.memory_sections.extend(short_term_sections);
3137 }
3138
3139 if let Some(workspace_dir) = workspace_dir
3140 && let Some(shared_coordination) = self
3141 .load_shared_coordination_memory(workspace_dir, metadata, 3)
3142 .await
3143 && !shared_coordination.is_empty()
3144 {
3145 tracing::debug!(
3146 shared_coordination_len = shared_coordination.len(),
3147 "Added shared coordination memory to request"
3148 );
3149 resolved_context.memory_sections.extend(shared_coordination);
3150 }
3151
3152 if let Some(workspace_dir) = workspace_dir
3158 && let Some(memory_context) = self
3159 .search_and_load_memory_bank(workspace_dir, metadata, query, 3)
3160 .await
3161 {
3162 tracing::debug!(
3163 memory_context_len = memory_context.len(),
3164 "Added memory bank context to request"
3165 );
3166 resolved_context.memory_sections.extend(memory_context);
3167
3168 if self.reflection_enabled_for(metadata)
3169 && let Some(reflection_sections) = self
3170 .load_relevant_reflections(workspace_dir, metadata, query)
3171 .await
3172 && !reflection_sections.is_empty()
3173 {
3174 tracing::debug!(
3175 reflection_context_len = reflection_sections.len(),
3176 "Added past reflections to request"
3177 );
3178 resolved_context.memory_sections.extend(reflection_sections);
3179 }
3180 }
3181
3182 let knowledge_budget_tokens =
3185 self.remaining_knowledge_budget_tokens(request, resolved_context);
3186 if let Some(knowledge_context) = self.load_enabled_knowledge(
3187 metadata.session_id.as_deref(),
3188 query,
3189 knowledge_budget_tokens,
3190 ) {
3191 tracing::debug!(
3192 knowledge_context_len = knowledge_context.len(),
3193 knowledge_budget_tokens = knowledge_budget_tokens,
3194 "Added enabled knowledge to request"
3195 );
3196 resolved_context.knowledge.push(knowledge_context);
3197 } else {
3198 tracing::debug!(
3199 knowledge_budget_tokens = knowledge_budget_tokens,
3200 "No enabled knowledge added after applying prompt budget"
3201 );
3202 }
3203 }
3204}
3205
3206fn merge_token_usage(
3207 usage: &mut Option<crate::llm_provider::TokenUsage>,
3208 additional: &crate::llm_provider::TokenUsage,
3209) {
3210 if let Some(existing) = usage.as_mut() {
3211 existing.input_tokens += additional.input_tokens;
3212 existing.output_tokens += additional.output_tokens;
3213 existing.total_tokens += additional.total_tokens;
3214 existing.estimated_cost_usd =
3215 match (existing.estimated_cost_usd, additional.estimated_cost_usd) {
3216 (Some(lhs), Some(rhs)) => Some(lhs + rhs),
3217 (Some(lhs), None) => Some(lhs),
3218 (None, Some(rhs)) => Some(rhs),
3219 (None, None) => None,
3220 };
3221 if existing.model.is_none() {
3222 existing.model = additional.model.clone();
3223 }
3224 if existing.provider.is_none() {
3225 existing.provider = additional.provider.clone();
3226 }
3227 } else {
3228 *usage = Some(additional.clone());
3229 }
3230}
3231
3232#[cfg(test)]
3233mod tests;