1use super::persistence::{
2 load_persisted_environment_by_id_async, persist_environment_to_disk_async,
3};
4use super::*;
5use gestura_core_agents::AgentExecutionMode;
6use gestura_core_tools::git::GitTools;
7use std::fs;
8use std::path::{Path, PathBuf};
9
10impl<M: OrchestratorAgentManager> AgentOrchestrator<M> {
11 pub(crate) async fn prepare_environment(
12 &self,
13 task: &DelegatedTask,
14 ) -> Result<EnvironmentRecord, String> {
15 let spec = self.build_environment_spec(task)?;
16 let now = Utc::now();
17 let mut record = EnvironmentRecord {
18 id: spec.id.clone(),
19 prepared_path: spec.prepared_path.clone(),
20 spec,
21 state: EnvironmentState::Provisioning,
22 health: EnvironmentHealth::Unknown,
23 lease: None,
24 cleanup_result: None,
25 recovery_status: RecoveryStatus::NotRequired,
26 recovery_action: None,
27 failure: None,
28 created_at: now,
29 updated_at: now,
30 last_verified_at: None,
31 metadata: None,
32 };
33
34 let preparation_result = match record.spec.execution_mode {
35 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
36 self.prepare_shared_environment(&mut record)
37 }
38 AgentExecutionMode::IsolatedWorkspace => self.prepare_isolated_environment(&mut record),
39 AgentExecutionMode::GitWorktree => self.prepare_git_worktree_environment(&mut record),
40 };
41
42 if let Err((kind, message)) = preparation_result {
43 record.state = EnvironmentState::Failed;
44 record.health = EnvironmentHealth::Unknown;
45 record.recovery_status = RecoveryStatus::Pending;
46 record.recovery_action = Some(RecoveryAction::MarkTaskBlocked);
47 record.failure = Some(EnvironmentFailure {
48 kind,
49 message,
50 command: None,
51 stderr: None,
52 occurred_at: Utc::now(),
53 });
54 }
55
56 record.updated_at = Utc::now();
57 record.last_verified_at = Some(Utc::now());
58 self.persist_environment_record(&record).await?;
59 Ok(record)
60 }
61
62 pub(crate) async fn acquire_environment_lease(
63 &self,
64 environment_id: &str,
65 task_id: &str,
66 agent_id: &str,
67 ) -> Result<EnvironmentRecord, String> {
68 let mut environments = self.environments.lock().await;
69 let record = environments
70 .get_mut(environment_id)
71 .ok_or_else(|| format!("Environment {environment_id} not found"))?;
72
73 if let Some(lease) = &record.lease
74 && lease.released_at.is_none()
75 && lease.task_id != task_id
76 {
77 return Err(format!(
78 "Environment {environment_id} is already leased to task {}",
79 lease.task_id
80 ));
81 }
82
83 record.lease = Some(EnvironmentLease {
84 task_id: task_id.to_string(),
85 agent_id: agent_id.to_string(),
86 lease_kind: EnvironmentLeaseKind::Execution,
87 acquired_at: Utc::now(),
88 released_at: None,
89 });
90 record.state = EnvironmentState::InUse;
91 record.updated_at = Utc::now();
92 let snapshot = record.clone();
93 drop(environments);
94 self.persist_environment_record(&snapshot).await?;
95 Ok(snapshot)
96 }
97
98 pub(crate) async fn release_environment_lease(
99 &self,
100 environment_id: &str,
101 ) -> Result<Option<EnvironmentRecord>, String> {
102 let mut environments = self.environments.lock().await;
103 let Some(record) = environments.get_mut(environment_id) else {
104 return Ok(None);
105 };
106
107 if let Some(lease) = record.lease.as_mut() {
108 lease.released_at = Some(Utc::now());
109 }
110 if !matches!(
111 record.state,
112 EnvironmentState::Archived | EnvironmentState::Removed | EnvironmentState::Failed
113 ) {
114 record.state = EnvironmentState::Ready;
115 }
116 record.updated_at = Utc::now();
117 let snapshot = record.clone();
118 drop(environments);
119 self.persist_environment_record(&snapshot).await?;
120 Ok(Some(snapshot))
121 }
122
123 pub(crate) async fn finalize_environment_for_task(
124 &self,
125 environment_id: &str,
126 success: bool,
127 force_archive_dirty: bool,
128 ) -> Result<Option<EnvironmentRecord>, String> {
129 let mut environments = self.environments.lock().await;
130 let Some(record) = environments.get_mut(environment_id) else {
131 return Ok(None);
132 };
133
134 if let Some(lease) = record.lease.as_mut() {
135 lease.released_at = Some(Utc::now());
136 }
137
138 let cleanup_result = self.apply_cleanup_policy(record, success, force_archive_dirty)?;
139 record.cleanup_result = Some(cleanup_result.clone());
140 record.updated_at = Utc::now();
141 let snapshot = record.clone();
142 drop(environments);
143 self.persist_environment_record(&snapshot).await?;
144 if let Some(observer) = self.observer.read().await.clone() {
145 let environment_id = snapshot.id.clone();
146 tokio::spawn(async move {
147 observer
148 .on_environment_cleanup(environment_id, cleanup_result)
149 .await;
150 });
151 }
152 Ok(Some(snapshot))
153 }
154
155 pub(crate) async fn persist_environment_record(
156 &self,
157 record: &EnvironmentRecord,
158 ) -> Result<(), String> {
159 let root = record.spec.workspace_root.clone();
160 persist_environment_to_disk_async(&root, record).await?;
161 self.environments
162 .lock()
163 .await
164 .insert(record.id.clone(), record.clone());
165 if let Some(observer) = self.observer.read().await.clone() {
166 let snapshot = record.clone();
167 tokio::spawn(async move {
168 observer.on_environment_updated(snapshot).await;
169 });
170 }
171 Ok(())
172 }
173
174 pub async fn list_environments(&self, run_id: Option<&str>) -> Vec<EnvironmentRecord> {
175 let environments = self.environments.lock().await;
176 let mut records: Vec<_> = environments
177 .values()
178 .filter(|record| {
179 run_id
180 .map(|value| record.spec.run_id == value)
181 .unwrap_or(true)
182 })
183 .cloned()
184 .collect();
185 records.sort_by(|left, right| right.updated_at.cmp(&left.updated_at));
186 records
187 }
188
189 pub async fn get_environment(&self, environment_id: &str) -> Option<EnvironmentRecord> {
190 if let Some(environment) = self.environments.lock().await.get(environment_id).cloned() {
191 return Some(environment);
192 }
193
194 let root = self.default_workspace_dir.as_ref()?.clone();
195 load_persisted_environment_by_id_async(&root, environment_id).await
196 }
197
198 pub async fn retry_environment_preparation(
199 &self,
200 environment_id: &str,
201 ) -> Result<EnvironmentRecord, String> {
202 let existing = self
203 .get_environment(environment_id)
204 .await
205 .ok_or_else(|| format!("Environment {environment_id} not found"))?;
206
207 let task = self
208 .find_task_by_environment_id(environment_id)
209 .await
210 .ok_or_else(|| format!("No task found for environment {environment_id}"))?;
211
212 let mut refreshed = self.prepare_environment(&task).await?;
213 refreshed.spec.cleanup_policy = existing.spec.cleanup_policy;
214 self.update_environment_in_runs(&refreshed).await?;
215 Ok(refreshed)
216 }
217
218 pub async fn cleanup_environment(
219 &self,
220 environment_id: &str,
221 archive_if_dirty: bool,
222 ) -> Result<EnvironmentRecord, String> {
223 let updated = self
224 .finalize_environment_for_task(environment_id, true, archive_if_dirty)
225 .await?
226 .ok_or_else(|| format!("Environment {environment_id} not found"))?;
227 self.update_environment_in_runs(&updated).await?;
228 Ok(updated)
229 }
230
231 pub(crate) async fn update_environment_in_runs(
232 &self,
233 environment: &EnvironmentRecord,
234 ) -> Result<(), String> {
235 let mut runs = self.supervisor_runs.lock().await;
236 let mut affected_runs = Vec::new();
237 for run in runs.values_mut() {
238 let mut changed = false;
239 for task_record in &mut run.tasks {
240 if task_record.environment_id == environment.id {
241 task_record.environment = environment.summary();
242 if let Some(local_execution) = task_record.local_execution.as_mut()
243 && let Some(progress) = local_execution.progress.as_mut()
244 {
245 progress.environment = Some(environment_snapshot_from_execution(
246 &task_record.environment,
247 ));
248 progress.waiting_reason = match progress.phase {
249 LocalExecutionPhase::Waiting => {
250 Some(LocalExecutionWaitingReason::EnvironmentTransition)
251 }
252 _ => progress.waiting_reason,
253 };
254 progress.updated_at = Utc::now();
255 local_execution.last_synced_at = progress.updated_at;
256 }
257 task_record.updated_at = Utc::now();
258 changed = true;
259 }
260 }
261 if changed {
262 run.updated_at = Utc::now();
263 affected_runs.push(run.clone());
264 }
265 }
266 drop(runs);
267
268 for run in affected_runs {
269 self.persist_run_async(&run).await?;
270 self.notify_run_updated(run).await;
271 }
272
273 Ok(())
274 }
275
276 pub(crate) async fn find_task_by_environment_id(
277 &self,
278 environment_id: &str,
279 ) -> Option<DelegatedTask> {
280 let runs = self.supervisor_runs.lock().await;
281 runs.values()
282 .flat_map(|run| run.tasks.iter())
283 .find(|record| record.environment_id == environment_id)
284 .map(|record| record.task.clone())
285 }
286
287 fn build_environment_spec(&self, task: &DelegatedTask) -> Result<EnvironmentSpec, String> {
288 let workspace_root = task
289 .workspace_dir
290 .clone()
291 .or_else(|| self.default_workspace_dir.clone())
292 .ok_or_else(|| format!("Task {} is missing a workspace root", task.id))?;
293
294 let session_workspace =
295 self.session_workspace(&workspace_root, task.session_id.as_deref())?;
296 let run_id = task
297 .run_id
298 .clone()
299 .ok_or_else(|| format!("Task {} is missing run_id", task.id))?;
300 let agent_component = sanitize_component(&task.agent_id);
301 let task_component = sanitize_component(&task.id);
302 let run_component = sanitize_component(&run_id);
303 let session_component = sanitize_component(task.session_id.as_deref().unwrap_or("global"));
304 let environment_id = format!("env_{run_component}_{task_component}_{agent_component}");
305
306 let cleanup_policy = default_cleanup_policy(&task.execution_mode);
307 let remote_url = task.remote_target.as_ref().map(|target| target.url.clone());
308
309 let git_worktree = if matches!(task.execution_mode, AgentExecutionMode::GitWorktree) {
310 let git = GitTools::new(Some(workspace_root.clone()));
311 let repo_root = git
312 .rev_parse_toplevel()
313 .map_err(|error| format!("Failed to locate git repository root: {error}"))?;
314 let base_branch = git
315 .current_branch()
316 .map_err(|error| format!("Failed to determine current branch: {error}"))?;
317 let worktree_path = session_workspace
318 .resolve_path_for_create(
319 &Path::new(".gestura")
320 .join("worktrees")
321 .join(session_component.clone())
322 .join(run_component.clone())
323 .join(agent_component.clone())
324 .join(task_component.clone()),
325 )
326 .map_err(|error| format!("Failed to resolve worktree path: {error}"))?;
327 Some(GitWorktreeSpec {
328 repo_root,
329 base_branch,
330 worktree_branch: format!(
331 "gestura/{}/{}/{}/{}",
332 session_component, run_component, agent_component, task_component
333 ),
334 worktree_path,
335 create_branch_if_missing: true,
336 })
337 } else {
338 None
339 };
340
341 let prepared_root = match task.execution_mode {
342 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
343 workspace_root.clone()
344 }
345 AgentExecutionMode::IsolatedWorkspace => session_workspace
346 .resolve_path_for_create(
347 &Path::new(".gestura")
348 .join("environments")
349 .join(session_component)
350 .join(run_component)
351 .join(agent_component)
352 .join(task_component),
353 )
354 .map_err(|error| format!("Failed to resolve isolated workspace path: {error}"))?,
355 AgentExecutionMode::GitWorktree => git_worktree
356 .as_ref()
357 .map(|worktree| worktree.worktree_path.clone())
358 .unwrap_or_else(|| workspace_root.clone()),
359 };
360
361 Ok(EnvironmentSpec {
362 id: environment_id,
363 execution_mode: task.execution_mode.clone(),
364 workspace_root,
365 prepared_path: prepared_root,
366 session_id: task.session_id.clone(),
367 run_id,
368 task_id: task.id.clone(),
369 agent_id: task.agent_id.clone(),
370 cleanup_policy,
371 write_access: !task.planning_only,
372 git_worktree,
373 remote_url,
374 })
375 }
376
377 fn prepare_shared_environment(
378 &self,
379 record: &mut EnvironmentRecord,
380 ) -> Result<(), (EnvironmentFailureKind, String)> {
381 if !record.prepared_path.exists() {
382 return Err((
383 EnvironmentFailureKind::WorkspaceNotFound,
384 format!(
385 "Workspace root {} does not exist",
386 record.prepared_path.display()
387 ),
388 ));
389 }
390
391 record.health = EnvironmentHealth::Clean;
392 record.state = EnvironmentState::Ready;
393 record.recovery_status = RecoveryStatus::NotRequired;
394 record.recovery_action = None;
395 Ok(())
396 }
397
398 fn prepare_isolated_environment(
399 &self,
400 record: &mut EnvironmentRecord,
401 ) -> Result<(), (EnvironmentFailureKind, String)> {
402 if let Some(parent) = record.prepared_path.parent() {
403 fs::create_dir_all(parent).map_err(|error| {
404 (
405 EnvironmentFailureKind::WorkspaceNotFound,
406 format!(
407 "Failed to create parent directory {}: {error}",
408 parent.display()
409 ),
410 )
411 })?;
412 }
413
414 fs::create_dir_all(&record.prepared_path).map_err(|error| {
415 (
416 EnvironmentFailureKind::WorkspaceNotFound,
417 format!(
418 "Failed to create isolated environment {}: {error}",
419 record.prepared_path.display()
420 ),
421 )
422 })?;
423
424 record.health = EnvironmentHealth::Clean;
425 record.state = EnvironmentState::Ready;
426 record.recovery_status = RecoveryStatus::NotRequired;
427 record.recovery_action = None;
428 Ok(())
429 }
430
431 fn prepare_git_worktree_environment(
432 &self,
433 record: &mut EnvironmentRecord,
434 ) -> Result<(), (EnvironmentFailureKind, String)> {
435 let Some(spec) = record.spec.git_worktree.as_ref() else {
436 return Err((
437 EnvironmentFailureKind::WorktreeInvalid,
438 "Missing git worktree spec for git-worktree execution mode".to_string(),
439 ));
440 };
441 let git = GitTools::new(Some(spec.repo_root.clone()));
442 if !git.path_is_git_repo().unwrap_or(false) {
443 return Err((
444 EnvironmentFailureKind::NotGitRepository,
445 format!("{} is not a git repository", spec.repo_root.display()),
446 ));
447 }
448
449 let existing_worktree = git
450 .worktree_list()
451 .map_err(|error| {
452 (
453 EnvironmentFailureKind::GitCommandFailed,
454 format!("Failed to list git worktrees: {error}"),
455 )
456 })?
457 .into_iter()
458 .find(|worktree| worktree.path == spec.worktree_path);
459
460 if spec.worktree_path.exists() && existing_worktree.is_none() {
461 return Err((
462 EnvironmentFailureKind::WorktreeAlreadyExists,
463 format!(
464 "Path {} exists but is not a registered git worktree",
465 spec.worktree_path.display()
466 ),
467 ));
468 }
469
470 if existing_worktree.is_none() {
471 git.worktree_add(
472 &spec.worktree_path,
473 &spec.worktree_branch,
474 &spec.base_branch,
475 spec.create_branch_if_missing,
476 )
477 .map_err(|error| {
478 (
479 EnvironmentFailureKind::WorktreeCreationFailed,
480 format!("Failed to create git worktree: {error}"),
481 )
482 })?;
483 }
484
485 if !spec.worktree_path.exists() {
486 return Err((
487 EnvironmentFailureKind::WorktreeInvalid,
488 format!(
489 "Git worktree {} was not created",
490 spec.worktree_path.display()
491 ),
492 ));
493 }
494
495 let is_clean = git
496 .is_worktree_clean(&spec.worktree_path)
497 .map_err(|error| {
498 (
499 EnvironmentFailureKind::GitCommandFailed,
500 format!("Failed to inspect worktree cleanliness: {error}"),
501 )
502 })?;
503
504 record.health = if is_clean {
505 EnvironmentHealth::Clean
506 } else {
507 EnvironmentHealth::Dirty
508 };
509 record.state = EnvironmentState::Ready;
510 record.recovery_status = RecoveryStatus::NotRequired;
511 record.recovery_action = None;
512 Ok(())
513 }
514
515 fn apply_cleanup_policy(
516 &self,
517 record: &mut EnvironmentRecord,
518 success: bool,
519 force_archive_dirty: bool,
520 ) -> Result<CleanupResult, String> {
521 record.state = EnvironmentState::Cleaning;
522 let disposition = match record.spec.cleanup_policy {
523 CleanupPolicy::KeepAlways => CleanupDisposition::Kept,
524 CleanupPolicy::RemoveOnSuccess if success => CleanupDisposition::Removed,
525 CleanupPolicy::ArchiveOnFailure if !success => CleanupDisposition::Archived,
526 CleanupPolicy::ArchiveAlways => CleanupDisposition::Archived,
527 CleanupPolicy::RemoveWhenCleanOtherwiseArchive => {
528 if self.environment_is_clean(record)? && !force_archive_dirty {
529 CleanupDisposition::Removed
530 } else {
531 CleanupDisposition::Archived
532 }
533 }
534 _ => CleanupDisposition::Kept,
535 };
536
537 let retained_path = match disposition {
538 CleanupDisposition::Removed => {
539 self.remove_environment_path(record)?;
540 record.state = EnvironmentState::Removed;
541 None
542 }
543 CleanupDisposition::Archived => {
544 record.state = EnvironmentState::Archived;
545 Some(record.prepared_path.clone())
546 }
547 CleanupDisposition::Kept => {
548 record.state = EnvironmentState::Ready;
549 Some(record.prepared_path.clone())
550 }
551 };
552
553 Ok(CleanupResult {
554 disposition,
555 completed_at: Utc::now(),
556 retained_path,
557 summary: format!(
558 "Environment {} cleanup finished with {:?}",
559 record.id, disposition
560 ),
561 })
562 }
563
564 fn environment_is_clean(&self, record: &EnvironmentRecord) -> Result<bool, String> {
565 match record.spec.execution_mode {
566 AgentExecutionMode::GitWorktree => {
567 let Some(spec) = record.spec.git_worktree.as_ref() else {
568 return Ok(false);
569 };
570 GitTools::new(Some(spec.repo_root.clone()))
571 .is_worktree_clean(&spec.worktree_path)
572 .map_err(|error| format!("Failed to inspect git worktree cleanliness: {error}"))
573 }
574 AgentExecutionMode::IsolatedWorkspace => {
575 Ok(is_directory_effectively_empty(&record.prepared_path)?)
576 }
577 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => Ok(false),
578 }
579 }
580
581 fn remove_environment_path(&self, record: &EnvironmentRecord) -> Result<(), String> {
582 match record.spec.execution_mode {
583 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => Ok(()),
584 AgentExecutionMode::IsolatedWorkspace => {
585 if record.prepared_path.exists() {
586 fs::remove_dir_all(&record.prepared_path).map_err(|error| {
587 format!(
588 "Failed to remove isolated environment {}: {error}",
589 record.prepared_path.display()
590 )
591 })?;
592 }
593 Ok(())
594 }
595 AgentExecutionMode::GitWorktree => {
596 let Some(spec) = record.spec.git_worktree.as_ref() else {
597 return Ok(());
598 };
599 GitTools::new(Some(spec.repo_root.clone()))
600 .worktree_remove(&spec.worktree_path, true)
601 .map_err(|error| format!("Failed to remove git worktree: {error}"))?;
602 GitTools::new(Some(spec.repo_root.clone()))
603 .worktree_prune()
604 .map_err(|error| format!("Failed to prune git worktrees: {error}"))?;
605 Ok(())
606 }
607 }
608 }
609
610 fn session_workspace(
611 &self,
612 workspace_root: &Path,
613 session_id: Option<&str>,
614 ) -> Result<SessionWorkspace, String> {
615 SessionWorkspace::from_directory(
616 session_id.unwrap_or("orchestrator"),
617 workspace_root.to_path_buf(),
618 )
619 .map_err(|error| format!("Failed to initialize session workspace: {error}"))
620 }
621}
622
623pub(super) fn sanitize_component(value: &str) -> String {
624 value
625 .chars()
626 .map(|character| match character {
627 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => character,
628 _ => '-',
629 })
630 .collect()
631}
632
633pub(super) fn default_cleanup_policy(mode: &AgentExecutionMode) -> CleanupPolicy {
634 match mode {
635 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
636 CleanupPolicy::KeepAlways
637 }
638 AgentExecutionMode::IsolatedWorkspace | AgentExecutionMode::GitWorktree => {
639 CleanupPolicy::RemoveWhenCleanOtherwiseArchive
640 }
641 }
642}
643
644fn is_directory_effectively_empty(path: &PathBuf) -> Result<bool, String> {
645 if !path.exists() {
646 return Ok(true);
647 }
648 let mut entries = fs::read_dir(path)
649 .map_err(|error| format!("Failed to inspect directory {}: {error}", path.display()))?;
650 Ok(entries.next().is_none())
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use crate::AppConfig;
657 use gestura_core_agents::{AgentManager, AgentRole};
658 #[cfg(not(target_os = "windows"))]
659 use std::process::Command;
660 use tempfile::tempdir;
661
662 fn test_orchestrator(workspace_root: &Path) -> AgentOrchestrator<AgentManager> {
663 AgentOrchestrator::new_with_workspace_root(
664 AgentManager::new(workspace_root.join("environment-tests.db")),
665 AppConfig::default(),
666 Some(workspace_root.to_path_buf()),
667 )
668 }
669
670 fn test_task(
671 workspace_root: &Path,
672 run_id: &str,
673 task_id: &str,
674 agent_id: &str,
675 execution_mode: AgentExecutionMode,
676 ) -> DelegatedTask {
677 DelegatedTask {
678 id: task_id.to_string(),
679 agent_id: agent_id.to_string(),
680 prompt: format!("Execute {task_id}"),
681 context: None,
682 required_tools: vec![],
683 priority: 1,
684 session_id: Some("session-env".to_string()),
685 directive_id: None,
686 tracking_task_id: None,
687 run_id: Some(run_id.to_string()),
688 parent_task_id: None,
689 depends_on: vec![],
690 role: Some(AgentRole::Implementer),
691 delegation_brief: None,
692 planning_only: false,
693 approval_required: false,
694 reviewer_required: false,
695 test_required: false,
696 workspace_dir: Some(workspace_root.to_path_buf()),
697 execution_mode,
698 environment_id: None,
699 remote_target: None,
700 memory_tags: vec!["env-test".to_string()],
701 name: Some(task_id.to_string()),
702 }
703 }
704
705 fn test_task_record(
706 task: DelegatedTask,
707 environment: &EnvironmentRecord,
708 ) -> SupervisorTaskRecord {
709 let now = Utc::now();
710 SupervisorTaskRecord {
711 task,
712 state: SupervisorTaskState::Queued,
713 approval: TaskApprovalRecord::default(),
714 environment_id: environment.id.clone(),
715 environment: environment.summary(),
716 claimed_by: None,
717 attempts: 0,
718 blocked_reasons: vec![],
719 result: None,
720 remote_execution: None,
721 local_execution: None,
722 messages: vec![],
723 checkpoint: None,
724 created_at: now,
725 updated_at: now,
726 started_at: None,
727 completed_at: None,
728 }
729 }
730
731 fn test_run(
732 run_id: &str,
733 workspace_root: &Path,
734 task_record: SupervisorTaskRecord,
735 ) -> SupervisorRun {
736 let now = Utc::now();
737 SupervisorRun {
738 id: run_id.to_string(),
739 name: Some(format!("Run {run_id}")),
740 session_id: Some("session-env".to_string()),
741 workspace_dir: Some(workspace_root.to_path_buf()),
742 lead_agent_id: Some("supervisor-1".to_string()),
743 parent_run: None,
744 child_runs: vec![],
745 hierarchy_depth: 0,
746 max_hierarchy_depth: default_max_child_supervisor_depth(),
747 inherited_policy: None,
748 status: SupervisorRunStatus::Draft,
749 task_summary: SupervisorRunTaskSummary {
750 total: 1,
751 queued: 1,
752 ..SupervisorRunTaskSummary::default()
753 },
754 hierarchy_summary: None,
755 tasks: vec![task_record],
756 messages: vec![],
757 shared_cognition: vec![],
758 created_at: now,
759 updated_at: now,
760 completed_at: None,
761 metadata: None,
762 }
763 }
764
765 #[cfg(not(target_os = "windows"))]
766 fn run_git(repo_root: &Path, args: &[&str]) {
767 let status = Command::new("git")
768 .current_dir(repo_root)
769 .args(args)
770 .status()
771 .expect("run git command");
772 assert!(
773 status.success(),
774 "git command failed: git {}",
775 args.join(" ")
776 );
777 }
778
779 #[cfg(not(target_os = "windows"))]
780 fn init_git_repo(repo_root: &Path) {
781 run_git(repo_root, &["init", "--initial-branch=main"]);
782 run_git(
783 repo_root,
784 &["config", "user.email", "gestura-tests@example.com"],
785 );
786 run_git(repo_root, &["config", "user.name", "Gestura Tests"]);
787 fs::write(repo_root.join("README.md"), "seed\n").expect("write seed file");
788 run_git(repo_root, &["add", "README.md"]);
789 run_git(repo_root, &["commit", "-m", "Initial commit"]);
790 }
791
792 #[tokio::test]
793 #[cfg(not(target_os = "windows"))]
794 async fn test_prepare_environment_supports_shared_isolated_and_git_worktree_modes() {
795 let temp = tempdir().expect("tempdir");
796 init_git_repo(temp.path());
797 let orchestrator = test_orchestrator(temp.path());
798
799 let shared = orchestrator
800 .prepare_environment(&test_task(
801 temp.path(),
802 "run-shared",
803 "task-shared",
804 "agent-shared",
805 AgentExecutionMode::SharedWorkspace,
806 ))
807 .await
808 .expect("prepare shared environment");
809 assert_eq!(shared.state, EnvironmentState::Ready);
810 assert_eq!(shared.health, EnvironmentHealth::Clean);
811 assert_eq!(shared.prepared_path, temp.path());
812 assert_eq!(shared.spec.cleanup_policy, CleanupPolicy::KeepAlways);
813
814 let isolated = orchestrator
815 .prepare_environment(&test_task(
816 temp.path(),
817 "run-isolated",
818 "task-isolated",
819 "agent-isolated",
820 AgentExecutionMode::IsolatedWorkspace,
821 ))
822 .await
823 .expect("prepare isolated environment");
824 assert_eq!(isolated.state, EnvironmentState::Ready);
825 assert!(isolated.prepared_path.exists());
826 assert!(isolated.prepared_path != temp.path());
827 assert!(
828 isolated
829 .prepared_path
830 .to_string_lossy()
831 .contains(".gestura/environments")
832 );
833 assert_eq!(
834 isolated.spec.cleanup_policy,
835 CleanupPolicy::RemoveWhenCleanOtherwiseArchive
836 );
837
838 let worktree = orchestrator
839 .prepare_environment(&test_task(
840 temp.path(),
841 "run-worktree",
842 "task-worktree",
843 "agent-worktree",
844 AgentExecutionMode::GitWorktree,
845 ))
846 .await
847 .expect("prepare git worktree environment");
848 let worktree_spec = worktree
849 .spec
850 .git_worktree
851 .as_ref()
852 .expect("git worktree spec present");
853 assert_eq!(worktree.state, EnvironmentState::Ready);
854 assert_eq!(worktree.health, EnvironmentHealth::Clean);
855 assert!(worktree.prepared_path.exists());
856 assert_eq!(worktree.prepared_path, worktree_spec.worktree_path);
857 assert!(
858 worktree
859 .prepared_path
860 .to_string_lossy()
861 .contains(".gestura/worktrees")
862 );
863 assert!(
864 worktree_spec
865 .worktree_branch
866 .starts_with("gestura/session-env/")
867 );
868 }
869
870 #[tokio::test]
871 async fn test_retry_environment_preparation_recreates_missing_isolated_environment() {
872 let temp = tempdir().expect("tempdir");
873 let orchestrator = test_orchestrator(temp.path());
874 let task = test_task(
875 temp.path(),
876 "run-retry",
877 "task-retry",
878 "agent-retry",
879 AgentExecutionMode::IsolatedWorkspace,
880 );
881 let mut environment = orchestrator
882 .prepare_environment(&task)
883 .await
884 .expect("prepare isolated environment");
885 environment.spec.cleanup_policy = CleanupPolicy::ArchiveAlways;
886 orchestrator
887 .persist_environment_record(&environment)
888 .await
889 .expect("persist customized environment");
890
891 let run = test_run(
892 "run-retry",
893 temp.path(),
894 test_task_record(task.clone(), &environment),
895 );
896 orchestrator
897 .supervisor_runs
898 .lock()
899 .await
900 .insert(run.id.clone(), run);
901
902 fs::remove_dir_all(&environment.prepared_path).expect("remove isolated environment");
903 assert!(!environment.prepared_path.exists());
904
905 let retried = orchestrator
906 .retry_environment_preparation(&environment.id)
907 .await
908 .expect("retry isolated environment preparation");
909
910 assert!(retried.prepared_path.exists());
911 assert_eq!(retried.state, EnvironmentState::Ready);
912 assert_eq!(retried.health, EnvironmentHealth::Clean);
913 assert_eq!(retried.spec.cleanup_policy, CleanupPolicy::ArchiveAlways);
914
915 let runs = orchestrator.supervisor_runs.lock().await;
916 let run = runs.get("run-retry").expect("run should exist");
917 let record = run.tasks.first().expect("task record should exist");
918 assert_eq!(record.environment_id, retried.id);
919 assert_eq!(
920 record.environment.cleanup_policy,
921 CleanupPolicy::ArchiveAlways
922 );
923 assert_eq!(record.environment.state, EnvironmentState::Ready);
924 }
925
926 #[tokio::test]
927 async fn test_finalize_environment_for_task_removes_clean_isolated_workspace() {
928 let temp = tempdir().expect("tempdir");
929 let orchestrator = test_orchestrator(temp.path());
930 let environment = orchestrator
931 .prepare_environment(&test_task(
932 temp.path(),
933 "run-cleanup-isolated",
934 "task-cleanup-isolated",
935 "agent-cleanup",
936 AgentExecutionMode::IsolatedWorkspace,
937 ))
938 .await
939 .expect("prepare isolated environment");
940
941 let finalized = orchestrator
942 .finalize_environment_for_task(&environment.id, true, false)
943 .await
944 .expect("finalize environment")
945 .expect("environment should exist");
946
947 assert_eq!(finalized.state, EnvironmentState::Removed);
948 assert_eq!(
949 finalized
950 .cleanup_result
951 .as_ref()
952 .map(|result| result.disposition),
953 Some(CleanupDisposition::Removed)
954 );
955 assert!(!environment.prepared_path.exists());
956 }
957
958 #[tokio::test]
959 #[cfg(not(target_os = "windows"))]
960 async fn test_finalize_environment_for_task_archives_dirty_git_worktree() {
961 let temp = tempdir().expect("tempdir");
962 init_git_repo(temp.path());
963 let orchestrator = test_orchestrator(temp.path());
964 let environment = orchestrator
965 .prepare_environment(&test_task(
966 temp.path(),
967 "run-cleanup-worktree",
968 "task-cleanup-worktree",
969 "agent-worktree-cleanup",
970 AgentExecutionMode::GitWorktree,
971 ))
972 .await
973 .expect("prepare worktree environment");
974
975 fs::write(
976 environment.prepared_path.join("dirty.txt"),
977 "pending work\n",
978 )
979 .expect("write dirty worktree marker");
980
981 let finalized = orchestrator
982 .finalize_environment_for_task(&environment.id, true, false)
983 .await
984 .expect("finalize worktree environment")
985 .expect("environment should exist");
986
987 assert_eq!(finalized.state, EnvironmentState::Archived);
988 assert_eq!(
989 finalized
990 .cleanup_result
991 .as_ref()
992 .map(|result| result.disposition),
993 Some(CleanupDisposition::Archived)
994 );
995 assert!(environment.prepared_path.exists());
996 }
997
998 #[tokio::test]
999 async fn test_update_environment_in_runs_refreshes_local_execution_snapshot() {
1000 let temp = tempdir().expect("tempdir");
1001 let orchestrator = test_orchestrator(temp.path());
1002 let task = test_task(
1003 temp.path(),
1004 "run-local-env-sync",
1005 "task-local-env-sync",
1006 "agent-local-env-sync",
1007 AgentExecutionMode::IsolatedWorkspace,
1008 );
1009 let environment = orchestrator
1010 .prepare_environment(&task)
1011 .await
1012 .expect("prepare environment");
1013
1014 let mut record = test_task_record(task.clone(), &environment);
1015 record.state = SupervisorTaskState::Running;
1016 record.local_execution = Some(LocalExecutionRecord {
1017 status: "running".to_string(),
1018 status_reason: None,
1019 progress: Some(LocalExecutionProgress {
1020 phase: LocalExecutionPhase::Waiting,
1021 waiting_reason: Some(LocalExecutionWaitingReason::ShellProcess),
1022 stage: Some("shell".to_string()),
1023 message: Some("Shell started".to_string()),
1024 percent: None,
1025 iteration: 1,
1026 current_tool_name: Some("shell".to_string()),
1027 last_completed_tool_name: None,
1028 last_completed_tool_duration_ms: None,
1029 completed_tool_call_count: 0,
1030 has_partial_content: false,
1031 partial_content_chars: 0,
1032 has_partial_thinking: false,
1033 partial_thinking_chars: 0,
1034 token_usage: None,
1035 environment: Some(environment_snapshot_from_execution(&record.environment)),
1036 updated_at: Utc::now(),
1037 }),
1038 last_synced_at: Utc::now(),
1039 });
1040
1041 let run = test_run("run-local-env-sync", temp.path(), record);
1042 orchestrator
1043 .supervisor_runs
1044 .lock()
1045 .await
1046 .insert(run.id.clone(), run.clone());
1047
1048 let mut updated = environment.clone();
1049 updated.state = EnvironmentState::Recovering;
1050 updated.health = EnvironmentHealth::Dirty;
1051 updated.recovery_status = RecoveryStatus::Pending;
1052 orchestrator
1053 .update_environment_in_runs(&updated)
1054 .await
1055 .expect("update run environment snapshot");
1056
1057 let run = orchestrator
1058 .supervisor_runs
1059 .lock()
1060 .await
1061 .get("run-local-env-sync")
1062 .cloned()
1063 .expect("run should exist");
1064 let progress = run
1065 .tasks
1066 .first()
1067 .and_then(|record| record.local_execution.as_ref())
1068 .and_then(|local| local.progress.as_ref())
1069 .expect("local execution progress should remain attached");
1070 let environment_snapshot = progress
1071 .environment
1072 .as_ref()
1073 .expect("environment snapshot should be refreshed");
1074 assert_eq!(environment_snapshot.state, EnvironmentState::Recovering);
1075 assert_eq!(environment_snapshot.health, EnvironmentHealth::Dirty);
1076 assert_eq!(
1077 progress.waiting_reason,
1078 Some(LocalExecutionWaitingReason::EnvironmentTransition)
1079 );
1080 }
1081}