gestura_core/orchestrator/
environment.rs

1use super::persistence::{
2    load_persisted_environment_by_id_async, persist_environment_to_disk_async,
3};
4use super::*;
5use gestura_core_agents::AgentExecutionMode;
6use gestura_core_tools::git::GitTools;
7use std::fs;
8use std::path::{Path, PathBuf};
9
10impl<M: OrchestratorAgentManager> AgentOrchestrator<M> {
11    pub(crate) async fn prepare_environment(
12        &self,
13        task: &DelegatedTask,
14    ) -> Result<EnvironmentRecord, String> {
15        let spec = self.build_environment_spec(task)?;
16        let now = Utc::now();
17        let mut record = EnvironmentRecord {
18            id: spec.id.clone(),
19            prepared_path: spec.prepared_path.clone(),
20            spec,
21            state: EnvironmentState::Provisioning,
22            health: EnvironmentHealth::Unknown,
23            lease: None,
24            cleanup_result: None,
25            recovery_status: RecoveryStatus::NotRequired,
26            recovery_action: None,
27            failure: None,
28            created_at: now,
29            updated_at: now,
30            last_verified_at: None,
31            metadata: None,
32        };
33
34        let preparation_result = match record.spec.execution_mode {
35            AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
36                self.prepare_shared_environment(&mut record)
37            }
38            AgentExecutionMode::IsolatedWorkspace => self.prepare_isolated_environment(&mut record),
39            AgentExecutionMode::GitWorktree => self.prepare_git_worktree_environment(&mut record),
40        };
41
42        if let Err((kind, message)) = preparation_result {
43            record.state = EnvironmentState::Failed;
44            record.health = EnvironmentHealth::Unknown;
45            record.recovery_status = RecoveryStatus::Pending;
46            record.recovery_action = Some(RecoveryAction::MarkTaskBlocked);
47            record.failure = Some(EnvironmentFailure {
48                kind,
49                message,
50                command: None,
51                stderr: None,
52                occurred_at: Utc::now(),
53            });
54        }
55
56        record.updated_at = Utc::now();
57        record.last_verified_at = Some(Utc::now());
58        self.persist_environment_record(&record).await?;
59        Ok(record)
60    }
61
62    pub(crate) async fn acquire_environment_lease(
63        &self,
64        environment_id: &str,
65        task_id: &str,
66        agent_id: &str,
67    ) -> Result<EnvironmentRecord, String> {
68        let mut environments = self.environments.lock().await;
69        let record = environments
70            .get_mut(environment_id)
71            .ok_or_else(|| format!("Environment {environment_id} not found"))?;
72
73        if let Some(lease) = &record.lease
74            && lease.released_at.is_none()
75            && lease.task_id != task_id
76        {
77            return Err(format!(
78                "Environment {environment_id} is already leased to task {}",
79                lease.task_id
80            ));
81        }
82
83        record.lease = Some(EnvironmentLease {
84            task_id: task_id.to_string(),
85            agent_id: agent_id.to_string(),
86            lease_kind: EnvironmentLeaseKind::Execution,
87            acquired_at: Utc::now(),
88            released_at: None,
89        });
90        record.state = EnvironmentState::InUse;
91        record.updated_at = Utc::now();
92        let snapshot = record.clone();
93        drop(environments);
94        self.persist_environment_record(&snapshot).await?;
95        Ok(snapshot)
96    }
97
98    pub(crate) async fn release_environment_lease(
99        &self,
100        environment_id: &str,
101    ) -> Result<Option<EnvironmentRecord>, String> {
102        let mut environments = self.environments.lock().await;
103        let Some(record) = environments.get_mut(environment_id) else {
104            return Ok(None);
105        };
106
107        if let Some(lease) = record.lease.as_mut() {
108            lease.released_at = Some(Utc::now());
109        }
110        if !matches!(
111            record.state,
112            EnvironmentState::Archived | EnvironmentState::Removed | EnvironmentState::Failed
113        ) {
114            record.state = EnvironmentState::Ready;
115        }
116        record.updated_at = Utc::now();
117        let snapshot = record.clone();
118        drop(environments);
119        self.persist_environment_record(&snapshot).await?;
120        Ok(Some(snapshot))
121    }
122
123    pub(crate) async fn finalize_environment_for_task(
124        &self,
125        environment_id: &str,
126        success: bool,
127        force_archive_dirty: bool,
128    ) -> Result<Option<EnvironmentRecord>, String> {
129        let mut environments = self.environments.lock().await;
130        let Some(record) = environments.get_mut(environment_id) else {
131            return Ok(None);
132        };
133
134        if let Some(lease) = record.lease.as_mut() {
135            lease.released_at = Some(Utc::now());
136        }
137
138        let cleanup_result = self.apply_cleanup_policy(record, success, force_archive_dirty)?;
139        record.cleanup_result = Some(cleanup_result.clone());
140        record.updated_at = Utc::now();
141        let snapshot = record.clone();
142        drop(environments);
143        self.persist_environment_record(&snapshot).await?;
144        if let Some(observer) = self.observer.read().await.clone() {
145            let environment_id = snapshot.id.clone();
146            tokio::spawn(async move {
147                observer
148                    .on_environment_cleanup(environment_id, cleanup_result)
149                    .await;
150            });
151        }
152        Ok(Some(snapshot))
153    }
154
155    pub(crate) async fn persist_environment_record(
156        &self,
157        record: &EnvironmentRecord,
158    ) -> Result<(), String> {
159        let root = record.spec.workspace_root.clone();
160        persist_environment_to_disk_async(&root, record).await?;
161        self.environments
162            .lock()
163            .await
164            .insert(record.id.clone(), record.clone());
165        if let Some(observer) = self.observer.read().await.clone() {
166            let snapshot = record.clone();
167            tokio::spawn(async move {
168                observer.on_environment_updated(snapshot).await;
169            });
170        }
171        Ok(())
172    }
173
174    pub async fn list_environments(&self, run_id: Option<&str>) -> Vec<EnvironmentRecord> {
175        let environments = self.environments.lock().await;
176        let mut records: Vec<_> = environments
177            .values()
178            .filter(|record| {
179                run_id
180                    .map(|value| record.spec.run_id == value)
181                    .unwrap_or(true)
182            })
183            .cloned()
184            .collect();
185        records.sort_by(|left, right| right.updated_at.cmp(&left.updated_at));
186        records
187    }
188
189    pub async fn get_environment(&self, environment_id: &str) -> Option<EnvironmentRecord> {
190        if let Some(environment) = self.environments.lock().await.get(environment_id).cloned() {
191            return Some(environment);
192        }
193
194        let root = self.default_workspace_dir.as_ref()?.clone();
195        load_persisted_environment_by_id_async(&root, environment_id).await
196    }
197
198    pub async fn retry_environment_preparation(
199        &self,
200        environment_id: &str,
201    ) -> Result<EnvironmentRecord, String> {
202        let existing = self
203            .get_environment(environment_id)
204            .await
205            .ok_or_else(|| format!("Environment {environment_id} not found"))?;
206
207        let task = self
208            .find_task_by_environment_id(environment_id)
209            .await
210            .ok_or_else(|| format!("No task found for environment {environment_id}"))?;
211
212        let mut refreshed = self.prepare_environment(&task).await?;
213        refreshed.spec.cleanup_policy = existing.spec.cleanup_policy;
214        self.update_environment_in_runs(&refreshed).await?;
215        Ok(refreshed)
216    }
217
218    pub async fn cleanup_environment(
219        &self,
220        environment_id: &str,
221        archive_if_dirty: bool,
222    ) -> Result<EnvironmentRecord, String> {
223        let updated = self
224            .finalize_environment_for_task(environment_id, true, archive_if_dirty)
225            .await?
226            .ok_or_else(|| format!("Environment {environment_id} not found"))?;
227        self.update_environment_in_runs(&updated).await?;
228        Ok(updated)
229    }
230
231    pub(crate) async fn update_environment_in_runs(
232        &self,
233        environment: &EnvironmentRecord,
234    ) -> Result<(), String> {
235        let mut runs = self.supervisor_runs.lock().await;
236        let mut affected_runs = Vec::new();
237        for run in runs.values_mut() {
238            let mut changed = false;
239            for task_record in &mut run.tasks {
240                if task_record.environment_id == environment.id {
241                    task_record.environment = environment.summary();
242                    if let Some(local_execution) = task_record.local_execution.as_mut()
243                        && let Some(progress) = local_execution.progress.as_mut()
244                    {
245                        progress.environment = Some(environment_snapshot_from_execution(
246                            &task_record.environment,
247                        ));
248                        progress.waiting_reason = match progress.phase {
249                            LocalExecutionPhase::Waiting => {
250                                Some(LocalExecutionWaitingReason::EnvironmentTransition)
251                            }
252                            _ => progress.waiting_reason,
253                        };
254                        progress.updated_at = Utc::now();
255                        local_execution.last_synced_at = progress.updated_at;
256                    }
257                    task_record.updated_at = Utc::now();
258                    changed = true;
259                }
260            }
261            if changed {
262                run.updated_at = Utc::now();
263                affected_runs.push(run.clone());
264            }
265        }
266        drop(runs);
267
268        for run in affected_runs {
269            self.persist_run_async(&run).await?;
270            self.notify_run_updated(run).await;
271        }
272
273        Ok(())
274    }
275
276    pub(crate) async fn find_task_by_environment_id(
277        &self,
278        environment_id: &str,
279    ) -> Option<DelegatedTask> {
280        let runs = self.supervisor_runs.lock().await;
281        runs.values()
282            .flat_map(|run| run.tasks.iter())
283            .find(|record| record.environment_id == environment_id)
284            .map(|record| record.task.clone())
285    }
286
287    fn build_environment_spec(&self, task: &DelegatedTask) -> Result<EnvironmentSpec, String> {
288        let workspace_root = task
289            .workspace_dir
290            .clone()
291            .or_else(|| self.default_workspace_dir.clone())
292            .ok_or_else(|| format!("Task {} is missing a workspace root", task.id))?;
293
294        let session_workspace =
295            self.session_workspace(&workspace_root, task.session_id.as_deref())?;
296        let run_id = task
297            .run_id
298            .clone()
299            .ok_or_else(|| format!("Task {} is missing run_id", task.id))?;
300        let agent_component = sanitize_component(&task.agent_id);
301        let task_component = sanitize_component(&task.id);
302        let run_component = sanitize_component(&run_id);
303        let session_component = sanitize_component(task.session_id.as_deref().unwrap_or("global"));
304        let environment_id = format!("env_{run_component}_{task_component}_{agent_component}");
305
306        let cleanup_policy = default_cleanup_policy(&task.execution_mode);
307        let remote_url = task.remote_target.as_ref().map(|target| target.url.clone());
308
309        let git_worktree = if matches!(task.execution_mode, AgentExecutionMode::GitWorktree) {
310            let git = GitTools::new(Some(workspace_root.clone()));
311            let repo_root = git
312                .rev_parse_toplevel()
313                .map_err(|error| format!("Failed to locate git repository root: {error}"))?;
314            let base_branch = git
315                .current_branch()
316                .map_err(|error| format!("Failed to determine current branch: {error}"))?;
317            let worktree_path = session_workspace
318                .resolve_path_for_create(
319                    &Path::new(".gestura")
320                        .join("worktrees")
321                        .join(session_component.clone())
322                        .join(run_component.clone())
323                        .join(agent_component.clone())
324                        .join(task_component.clone()),
325                )
326                .map_err(|error| format!("Failed to resolve worktree path: {error}"))?;
327            Some(GitWorktreeSpec {
328                repo_root,
329                base_branch,
330                worktree_branch: format!(
331                    "gestura/{}/{}/{}/{}",
332                    session_component, run_component, agent_component, task_component
333                ),
334                worktree_path,
335                create_branch_if_missing: true,
336            })
337        } else {
338            None
339        };
340
341        let prepared_root = match task.execution_mode {
342            AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
343                workspace_root.clone()
344            }
345            AgentExecutionMode::IsolatedWorkspace => session_workspace
346                .resolve_path_for_create(
347                    &Path::new(".gestura")
348                        .join("environments")
349                        .join(session_component)
350                        .join(run_component)
351                        .join(agent_component)
352                        .join(task_component),
353                )
354                .map_err(|error| format!("Failed to resolve isolated workspace path: {error}"))?,
355            AgentExecutionMode::GitWorktree => git_worktree
356                .as_ref()
357                .map(|worktree| worktree.worktree_path.clone())
358                .unwrap_or_else(|| workspace_root.clone()),
359        };
360
361        Ok(EnvironmentSpec {
362            id: environment_id,
363            execution_mode: task.execution_mode.clone(),
364            workspace_root,
365            prepared_path: prepared_root,
366            session_id: task.session_id.clone(),
367            run_id,
368            task_id: task.id.clone(),
369            agent_id: task.agent_id.clone(),
370            cleanup_policy,
371            write_access: !task.planning_only,
372            git_worktree,
373            remote_url,
374        })
375    }
376
377    fn prepare_shared_environment(
378        &self,
379        record: &mut EnvironmentRecord,
380    ) -> Result<(), (EnvironmentFailureKind, String)> {
381        if !record.prepared_path.exists() {
382            return Err((
383                EnvironmentFailureKind::WorkspaceNotFound,
384                format!(
385                    "Workspace root {} does not exist",
386                    record.prepared_path.display()
387                ),
388            ));
389        }
390
391        record.health = EnvironmentHealth::Clean;
392        record.state = EnvironmentState::Ready;
393        record.recovery_status = RecoveryStatus::NotRequired;
394        record.recovery_action = None;
395        Ok(())
396    }
397
398    fn prepare_isolated_environment(
399        &self,
400        record: &mut EnvironmentRecord,
401    ) -> Result<(), (EnvironmentFailureKind, String)> {
402        if let Some(parent) = record.prepared_path.parent() {
403            fs::create_dir_all(parent).map_err(|error| {
404                (
405                    EnvironmentFailureKind::WorkspaceNotFound,
406                    format!(
407                        "Failed to create parent directory {}: {error}",
408                        parent.display()
409                    ),
410                )
411            })?;
412        }
413
414        fs::create_dir_all(&record.prepared_path).map_err(|error| {
415            (
416                EnvironmentFailureKind::WorkspaceNotFound,
417                format!(
418                    "Failed to create isolated environment {}: {error}",
419                    record.prepared_path.display()
420                ),
421            )
422        })?;
423
424        record.health = EnvironmentHealth::Clean;
425        record.state = EnvironmentState::Ready;
426        record.recovery_status = RecoveryStatus::NotRequired;
427        record.recovery_action = None;
428        Ok(())
429    }
430
431    fn prepare_git_worktree_environment(
432        &self,
433        record: &mut EnvironmentRecord,
434    ) -> Result<(), (EnvironmentFailureKind, String)> {
435        let Some(spec) = record.spec.git_worktree.as_ref() else {
436            return Err((
437                EnvironmentFailureKind::WorktreeInvalid,
438                "Missing git worktree spec for git-worktree execution mode".to_string(),
439            ));
440        };
441        let git = GitTools::new(Some(spec.repo_root.clone()));
442        if !git.path_is_git_repo().unwrap_or(false) {
443            return Err((
444                EnvironmentFailureKind::NotGitRepository,
445                format!("{} is not a git repository", spec.repo_root.display()),
446            ));
447        }
448
449        let existing_worktree = git
450            .worktree_list()
451            .map_err(|error| {
452                (
453                    EnvironmentFailureKind::GitCommandFailed,
454                    format!("Failed to list git worktrees: {error}"),
455                )
456            })?
457            .into_iter()
458            .find(|worktree| worktree.path == spec.worktree_path);
459
460        if spec.worktree_path.exists() && existing_worktree.is_none() {
461            return Err((
462                EnvironmentFailureKind::WorktreeAlreadyExists,
463                format!(
464                    "Path {} exists but is not a registered git worktree",
465                    spec.worktree_path.display()
466                ),
467            ));
468        }
469
470        if existing_worktree.is_none() {
471            git.worktree_add(
472                &spec.worktree_path,
473                &spec.worktree_branch,
474                &spec.base_branch,
475                spec.create_branch_if_missing,
476            )
477            .map_err(|error| {
478                (
479                    EnvironmentFailureKind::WorktreeCreationFailed,
480                    format!("Failed to create git worktree: {error}"),
481                )
482            })?;
483        }
484
485        if !spec.worktree_path.exists() {
486            return Err((
487                EnvironmentFailureKind::WorktreeInvalid,
488                format!(
489                    "Git worktree {} was not created",
490                    spec.worktree_path.display()
491                ),
492            ));
493        }
494
495        let is_clean = git
496            .is_worktree_clean(&spec.worktree_path)
497            .map_err(|error| {
498                (
499                    EnvironmentFailureKind::GitCommandFailed,
500                    format!("Failed to inspect worktree cleanliness: {error}"),
501                )
502            })?;
503
504        record.health = if is_clean {
505            EnvironmentHealth::Clean
506        } else {
507            EnvironmentHealth::Dirty
508        };
509        record.state = EnvironmentState::Ready;
510        record.recovery_status = RecoveryStatus::NotRequired;
511        record.recovery_action = None;
512        Ok(())
513    }
514
515    fn apply_cleanup_policy(
516        &self,
517        record: &mut EnvironmentRecord,
518        success: bool,
519        force_archive_dirty: bool,
520    ) -> Result<CleanupResult, String> {
521        record.state = EnvironmentState::Cleaning;
522        let disposition = match record.spec.cleanup_policy {
523            CleanupPolicy::KeepAlways => CleanupDisposition::Kept,
524            CleanupPolicy::RemoveOnSuccess if success => CleanupDisposition::Removed,
525            CleanupPolicy::ArchiveOnFailure if !success => CleanupDisposition::Archived,
526            CleanupPolicy::ArchiveAlways => CleanupDisposition::Archived,
527            CleanupPolicy::RemoveWhenCleanOtherwiseArchive => {
528                if self.environment_is_clean(record)? && !force_archive_dirty {
529                    CleanupDisposition::Removed
530                } else {
531                    CleanupDisposition::Archived
532                }
533            }
534            _ => CleanupDisposition::Kept,
535        };
536
537        let retained_path = match disposition {
538            CleanupDisposition::Removed => {
539                self.remove_environment_path(record)?;
540                record.state = EnvironmentState::Removed;
541                None
542            }
543            CleanupDisposition::Archived => {
544                record.state = EnvironmentState::Archived;
545                Some(record.prepared_path.clone())
546            }
547            CleanupDisposition::Kept => {
548                record.state = EnvironmentState::Ready;
549                Some(record.prepared_path.clone())
550            }
551        };
552
553        Ok(CleanupResult {
554            disposition,
555            completed_at: Utc::now(),
556            retained_path,
557            summary: format!(
558                "Environment {} cleanup finished with {:?}",
559                record.id, disposition
560            ),
561        })
562    }
563
564    fn environment_is_clean(&self, record: &EnvironmentRecord) -> Result<bool, String> {
565        match record.spec.execution_mode {
566            AgentExecutionMode::GitWorktree => {
567                let Some(spec) = record.spec.git_worktree.as_ref() else {
568                    return Ok(false);
569                };
570                GitTools::new(Some(spec.repo_root.clone()))
571                    .is_worktree_clean(&spec.worktree_path)
572                    .map_err(|error| format!("Failed to inspect git worktree cleanliness: {error}"))
573            }
574            AgentExecutionMode::IsolatedWorkspace => {
575                Ok(is_directory_effectively_empty(&record.prepared_path)?)
576            }
577            AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => Ok(false),
578        }
579    }
580
581    fn remove_environment_path(&self, record: &EnvironmentRecord) -> Result<(), String> {
582        match record.spec.execution_mode {
583            AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => Ok(()),
584            AgentExecutionMode::IsolatedWorkspace => {
585                if record.prepared_path.exists() {
586                    fs::remove_dir_all(&record.prepared_path).map_err(|error| {
587                        format!(
588                            "Failed to remove isolated environment {}: {error}",
589                            record.prepared_path.display()
590                        )
591                    })?;
592                }
593                Ok(())
594            }
595            AgentExecutionMode::GitWorktree => {
596                let Some(spec) = record.spec.git_worktree.as_ref() else {
597                    return Ok(());
598                };
599                GitTools::new(Some(spec.repo_root.clone()))
600                    .worktree_remove(&spec.worktree_path, true)
601                    .map_err(|error| format!("Failed to remove git worktree: {error}"))?;
602                GitTools::new(Some(spec.repo_root.clone()))
603                    .worktree_prune()
604                    .map_err(|error| format!("Failed to prune git worktrees: {error}"))?;
605                Ok(())
606            }
607        }
608    }
609
610    fn session_workspace(
611        &self,
612        workspace_root: &Path,
613        session_id: Option<&str>,
614    ) -> Result<SessionWorkspace, String> {
615        SessionWorkspace::from_directory(
616            session_id.unwrap_or("orchestrator"),
617            workspace_root.to_path_buf(),
618        )
619        .map_err(|error| format!("Failed to initialize session workspace: {error}"))
620    }
621}
622
623pub(super) fn sanitize_component(value: &str) -> String {
624    value
625        .chars()
626        .map(|character| match character {
627            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => character,
628            _ => '-',
629        })
630        .collect()
631}
632
633pub(super) fn default_cleanup_policy(mode: &AgentExecutionMode) -> CleanupPolicy {
634    match mode {
635        AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
636            CleanupPolicy::KeepAlways
637        }
638        AgentExecutionMode::IsolatedWorkspace | AgentExecutionMode::GitWorktree => {
639            CleanupPolicy::RemoveWhenCleanOtherwiseArchive
640        }
641    }
642}
643
644fn is_directory_effectively_empty(path: &PathBuf) -> Result<bool, String> {
645    if !path.exists() {
646        return Ok(true);
647    }
648    let mut entries = fs::read_dir(path)
649        .map_err(|error| format!("Failed to inspect directory {}: {error}", path.display()))?;
650    Ok(entries.next().is_none())
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use crate::AppConfig;
657    use gestura_core_agents::{AgentManager, AgentRole};
658    #[cfg(not(target_os = "windows"))]
659    use std::process::Command;
660    use tempfile::tempdir;
661
662    fn test_orchestrator(workspace_root: &Path) -> AgentOrchestrator<AgentManager> {
663        AgentOrchestrator::new_with_workspace_root(
664            AgentManager::new(workspace_root.join("environment-tests.db")),
665            AppConfig::default(),
666            Some(workspace_root.to_path_buf()),
667        )
668    }
669
670    fn test_task(
671        workspace_root: &Path,
672        run_id: &str,
673        task_id: &str,
674        agent_id: &str,
675        execution_mode: AgentExecutionMode,
676    ) -> DelegatedTask {
677        DelegatedTask {
678            id: task_id.to_string(),
679            agent_id: agent_id.to_string(),
680            prompt: format!("Execute {task_id}"),
681            context: None,
682            required_tools: vec![],
683            priority: 1,
684            session_id: Some("session-env".to_string()),
685            directive_id: None,
686            tracking_task_id: None,
687            run_id: Some(run_id.to_string()),
688            parent_task_id: None,
689            depends_on: vec![],
690            role: Some(AgentRole::Implementer),
691            delegation_brief: None,
692            planning_only: false,
693            approval_required: false,
694            reviewer_required: false,
695            test_required: false,
696            workspace_dir: Some(workspace_root.to_path_buf()),
697            execution_mode,
698            environment_id: None,
699            remote_target: None,
700            memory_tags: vec!["env-test".to_string()],
701            name: Some(task_id.to_string()),
702        }
703    }
704
705    fn test_task_record(
706        task: DelegatedTask,
707        environment: &EnvironmentRecord,
708    ) -> SupervisorTaskRecord {
709        let now = Utc::now();
710        SupervisorTaskRecord {
711            task,
712            state: SupervisorTaskState::Queued,
713            approval: TaskApprovalRecord::default(),
714            environment_id: environment.id.clone(),
715            environment: environment.summary(),
716            claimed_by: None,
717            attempts: 0,
718            blocked_reasons: vec![],
719            result: None,
720            remote_execution: None,
721            local_execution: None,
722            messages: vec![],
723            checkpoint: None,
724            created_at: now,
725            updated_at: now,
726            started_at: None,
727            completed_at: None,
728        }
729    }
730
731    fn test_run(
732        run_id: &str,
733        workspace_root: &Path,
734        task_record: SupervisorTaskRecord,
735    ) -> SupervisorRun {
736        let now = Utc::now();
737        SupervisorRun {
738            id: run_id.to_string(),
739            name: Some(format!("Run {run_id}")),
740            session_id: Some("session-env".to_string()),
741            workspace_dir: Some(workspace_root.to_path_buf()),
742            lead_agent_id: Some("supervisor-1".to_string()),
743            parent_run: None,
744            child_runs: vec![],
745            hierarchy_depth: 0,
746            max_hierarchy_depth: default_max_child_supervisor_depth(),
747            inherited_policy: None,
748            status: SupervisorRunStatus::Draft,
749            task_summary: SupervisorRunTaskSummary {
750                total: 1,
751                queued: 1,
752                ..SupervisorRunTaskSummary::default()
753            },
754            hierarchy_summary: None,
755            tasks: vec![task_record],
756            messages: vec![],
757            shared_cognition: vec![],
758            created_at: now,
759            updated_at: now,
760            completed_at: None,
761            metadata: None,
762        }
763    }
764
765    #[cfg(not(target_os = "windows"))]
766    fn run_git(repo_root: &Path, args: &[&str]) {
767        let status = Command::new("git")
768            .current_dir(repo_root)
769            .args(args)
770            .status()
771            .expect("run git command");
772        assert!(
773            status.success(),
774            "git command failed: git {}",
775            args.join(" ")
776        );
777    }
778
779    #[cfg(not(target_os = "windows"))]
780    fn init_git_repo(repo_root: &Path) {
781        run_git(repo_root, &["init", "--initial-branch=main"]);
782        run_git(
783            repo_root,
784            &["config", "user.email", "gestura-tests@example.com"],
785        );
786        run_git(repo_root, &["config", "user.name", "Gestura Tests"]);
787        fs::write(repo_root.join("README.md"), "seed\n").expect("write seed file");
788        run_git(repo_root, &["add", "README.md"]);
789        run_git(repo_root, &["commit", "-m", "Initial commit"]);
790    }
791
792    #[tokio::test]
793    #[cfg(not(target_os = "windows"))]
794    async fn test_prepare_environment_supports_shared_isolated_and_git_worktree_modes() {
795        let temp = tempdir().expect("tempdir");
796        init_git_repo(temp.path());
797        let orchestrator = test_orchestrator(temp.path());
798
799        let shared = orchestrator
800            .prepare_environment(&test_task(
801                temp.path(),
802                "run-shared",
803                "task-shared",
804                "agent-shared",
805                AgentExecutionMode::SharedWorkspace,
806            ))
807            .await
808            .expect("prepare shared environment");
809        assert_eq!(shared.state, EnvironmentState::Ready);
810        assert_eq!(shared.health, EnvironmentHealth::Clean);
811        assert_eq!(shared.prepared_path, temp.path());
812        assert_eq!(shared.spec.cleanup_policy, CleanupPolicy::KeepAlways);
813
814        let isolated = orchestrator
815            .prepare_environment(&test_task(
816                temp.path(),
817                "run-isolated",
818                "task-isolated",
819                "agent-isolated",
820                AgentExecutionMode::IsolatedWorkspace,
821            ))
822            .await
823            .expect("prepare isolated environment");
824        assert_eq!(isolated.state, EnvironmentState::Ready);
825        assert!(isolated.prepared_path.exists());
826        assert!(isolated.prepared_path != temp.path());
827        assert!(
828            isolated
829                .prepared_path
830                .to_string_lossy()
831                .contains(".gestura/environments")
832        );
833        assert_eq!(
834            isolated.spec.cleanup_policy,
835            CleanupPolicy::RemoveWhenCleanOtherwiseArchive
836        );
837
838        let worktree = orchestrator
839            .prepare_environment(&test_task(
840                temp.path(),
841                "run-worktree",
842                "task-worktree",
843                "agent-worktree",
844                AgentExecutionMode::GitWorktree,
845            ))
846            .await
847            .expect("prepare git worktree environment");
848        let worktree_spec = worktree
849            .spec
850            .git_worktree
851            .as_ref()
852            .expect("git worktree spec present");
853        assert_eq!(worktree.state, EnvironmentState::Ready);
854        assert_eq!(worktree.health, EnvironmentHealth::Clean);
855        assert!(worktree.prepared_path.exists());
856        assert_eq!(worktree.prepared_path, worktree_spec.worktree_path);
857        assert!(
858            worktree
859                .prepared_path
860                .to_string_lossy()
861                .contains(".gestura/worktrees")
862        );
863        assert!(
864            worktree_spec
865                .worktree_branch
866                .starts_with("gestura/session-env/")
867        );
868    }
869
870    #[tokio::test]
871    async fn test_retry_environment_preparation_recreates_missing_isolated_environment() {
872        let temp = tempdir().expect("tempdir");
873        let orchestrator = test_orchestrator(temp.path());
874        let task = test_task(
875            temp.path(),
876            "run-retry",
877            "task-retry",
878            "agent-retry",
879            AgentExecutionMode::IsolatedWorkspace,
880        );
881        let mut environment = orchestrator
882            .prepare_environment(&task)
883            .await
884            .expect("prepare isolated environment");
885        environment.spec.cleanup_policy = CleanupPolicy::ArchiveAlways;
886        orchestrator
887            .persist_environment_record(&environment)
888            .await
889            .expect("persist customized environment");
890
891        let run = test_run(
892            "run-retry",
893            temp.path(),
894            test_task_record(task.clone(), &environment),
895        );
896        orchestrator
897            .supervisor_runs
898            .lock()
899            .await
900            .insert(run.id.clone(), run);
901
902        fs::remove_dir_all(&environment.prepared_path).expect("remove isolated environment");
903        assert!(!environment.prepared_path.exists());
904
905        let retried = orchestrator
906            .retry_environment_preparation(&environment.id)
907            .await
908            .expect("retry isolated environment preparation");
909
910        assert!(retried.prepared_path.exists());
911        assert_eq!(retried.state, EnvironmentState::Ready);
912        assert_eq!(retried.health, EnvironmentHealth::Clean);
913        assert_eq!(retried.spec.cleanup_policy, CleanupPolicy::ArchiveAlways);
914
915        let runs = orchestrator.supervisor_runs.lock().await;
916        let run = runs.get("run-retry").expect("run should exist");
917        let record = run.tasks.first().expect("task record should exist");
918        assert_eq!(record.environment_id, retried.id);
919        assert_eq!(
920            record.environment.cleanup_policy,
921            CleanupPolicy::ArchiveAlways
922        );
923        assert_eq!(record.environment.state, EnvironmentState::Ready);
924    }
925
926    #[tokio::test]
927    async fn test_finalize_environment_for_task_removes_clean_isolated_workspace() {
928        let temp = tempdir().expect("tempdir");
929        let orchestrator = test_orchestrator(temp.path());
930        let environment = orchestrator
931            .prepare_environment(&test_task(
932                temp.path(),
933                "run-cleanup-isolated",
934                "task-cleanup-isolated",
935                "agent-cleanup",
936                AgentExecutionMode::IsolatedWorkspace,
937            ))
938            .await
939            .expect("prepare isolated environment");
940
941        let finalized = orchestrator
942            .finalize_environment_for_task(&environment.id, true, false)
943            .await
944            .expect("finalize environment")
945            .expect("environment should exist");
946
947        assert_eq!(finalized.state, EnvironmentState::Removed);
948        assert_eq!(
949            finalized
950                .cleanup_result
951                .as_ref()
952                .map(|result| result.disposition),
953            Some(CleanupDisposition::Removed)
954        );
955        assert!(!environment.prepared_path.exists());
956    }
957
958    #[tokio::test]
959    #[cfg(not(target_os = "windows"))]
960    async fn test_finalize_environment_for_task_archives_dirty_git_worktree() {
961        let temp = tempdir().expect("tempdir");
962        init_git_repo(temp.path());
963        let orchestrator = test_orchestrator(temp.path());
964        let environment = orchestrator
965            .prepare_environment(&test_task(
966                temp.path(),
967                "run-cleanup-worktree",
968                "task-cleanup-worktree",
969                "agent-worktree-cleanup",
970                AgentExecutionMode::GitWorktree,
971            ))
972            .await
973            .expect("prepare worktree environment");
974
975        fs::write(
976            environment.prepared_path.join("dirty.txt"),
977            "pending work\n",
978        )
979        .expect("write dirty worktree marker");
980
981        let finalized = orchestrator
982            .finalize_environment_for_task(&environment.id, true, false)
983            .await
984            .expect("finalize worktree environment")
985            .expect("environment should exist");
986
987        assert_eq!(finalized.state, EnvironmentState::Archived);
988        assert_eq!(
989            finalized
990                .cleanup_result
991                .as_ref()
992                .map(|result| result.disposition),
993            Some(CleanupDisposition::Archived)
994        );
995        assert!(environment.prepared_path.exists());
996    }
997
998    #[tokio::test]
999    async fn test_update_environment_in_runs_refreshes_local_execution_snapshot() {
1000        let temp = tempdir().expect("tempdir");
1001        let orchestrator = test_orchestrator(temp.path());
1002        let task = test_task(
1003            temp.path(),
1004            "run-local-env-sync",
1005            "task-local-env-sync",
1006            "agent-local-env-sync",
1007            AgentExecutionMode::IsolatedWorkspace,
1008        );
1009        let environment = orchestrator
1010            .prepare_environment(&task)
1011            .await
1012            .expect("prepare environment");
1013
1014        let mut record = test_task_record(task.clone(), &environment);
1015        record.state = SupervisorTaskState::Running;
1016        record.local_execution = Some(LocalExecutionRecord {
1017            status: "running".to_string(),
1018            status_reason: None,
1019            progress: Some(LocalExecutionProgress {
1020                phase: LocalExecutionPhase::Waiting,
1021                waiting_reason: Some(LocalExecutionWaitingReason::ShellProcess),
1022                stage: Some("shell".to_string()),
1023                message: Some("Shell started".to_string()),
1024                percent: None,
1025                iteration: 1,
1026                current_tool_name: Some("shell".to_string()),
1027                last_completed_tool_name: None,
1028                last_completed_tool_duration_ms: None,
1029                completed_tool_call_count: 0,
1030                has_partial_content: false,
1031                partial_content_chars: 0,
1032                has_partial_thinking: false,
1033                partial_thinking_chars: 0,
1034                token_usage: None,
1035                environment: Some(environment_snapshot_from_execution(&record.environment)),
1036                updated_at: Utc::now(),
1037            }),
1038            last_synced_at: Utc::now(),
1039        });
1040
1041        let run = test_run("run-local-env-sync", temp.path(), record);
1042        orchestrator
1043            .supervisor_runs
1044            .lock()
1045            .await
1046            .insert(run.id.clone(), run.clone());
1047
1048        let mut updated = environment.clone();
1049        updated.state = EnvironmentState::Recovering;
1050        updated.health = EnvironmentHealth::Dirty;
1051        updated.recovery_status = RecoveryStatus::Pending;
1052        orchestrator
1053            .update_environment_in_runs(&updated)
1054            .await
1055            .expect("update run environment snapshot");
1056
1057        let run = orchestrator
1058            .supervisor_runs
1059            .lock()
1060            .await
1061            .get("run-local-env-sync")
1062            .cloned()
1063            .expect("run should exist");
1064        let progress = run
1065            .tasks
1066            .first()
1067            .and_then(|record| record.local_execution.as_ref())
1068            .and_then(|local| local.progress.as_ref())
1069            .expect("local execution progress should remain attached");
1070        let environment_snapshot = progress
1071            .environment
1072            .as_ref()
1073            .expect("environment snapshot should be refreshed");
1074        assert_eq!(environment_snapshot.state, EnvironmentState::Recovering);
1075        assert_eq!(environment_snapshot.health, EnvironmentHealth::Dirty);
1076        assert_eq!(
1077            progress.waiting_reason,
1078            Some(LocalExecutionWaitingReason::EnvironmentTransition)
1079        );
1080    }
1081}