gestura_core/orchestrator/
recovery.rs

1use super::environment::{default_cleanup_policy, sanitize_component};
2use super::*;
3use gestura_core_agents::AgentExecutionMode;
4use gestura_core_tools::git::GitTools;
5
6impl<M: OrchestratorAgentManager> AgentOrchestrator<M> {
7    pub(crate) fn bootstrap_persisted_state(&self) {
8        let Some(root) = self.default_workspace_dir.as_ref() else {
9            return;
10        };
11
12        let mut runs = load_persisted_runs(root);
13        let mut environments: HashMap<String, EnvironmentRecord> =
14            load_persisted_environments(root)
15                .into_iter()
16                .map(|environment| (environment.id.clone(), environment))
17                .collect();
18
19        let mut generated = Vec::new();
20        for run in &mut runs {
21            let run_id = run.id.clone();
22            let session_id = run.session_id.clone();
23            let workspace_dir = run.workspace_dir.clone();
24            for task_record in &mut run.tasks {
25                if task_record.environment_id.is_empty() {
26                    let record = legacy_environment_record(
27                        &run_id,
28                        session_id.clone(),
29                        workspace_dir.clone(),
30                        task_record,
31                    );
32                    task_record.environment_id = record.id.clone();
33                    task_record.environment = record.summary();
34                    generated.push(record);
35                }
36            }
37        }
38        for record in generated {
39            environments.entry(record.id.clone()).or_insert(record);
40        }
41
42        let mut index = HashMap::new();
43        for run in &runs {
44            for task_record in &run.tasks {
45                index.insert(task_record.task.id.clone(), run.id.clone());
46            }
47        }
48
49        if let Ok(mut guard) = self.supervisor_runs.try_lock() {
50            *guard = runs.into_iter().map(|run| (run.id.clone(), run)).collect();
51        }
52        if let Ok(mut guard) = self.environments.try_lock() {
53            *guard = environments;
54        }
55        if let Ok(mut guard) = self.task_run_index.try_lock() {
56            *guard = index;
57        }
58
59        let _ = self.reconcile_persisted_state_sync();
60        for task in self.prepare_resumable_tasks_from_checkpoints_sync() {
61            self.schedule_task_execution(task);
62        }
63    }
64
65    pub async fn reconcile_orchestrator_state(&self) -> Result<(), String> {
66        self.reconcile_persisted_state().await
67    }
68
69    async fn reconcile_persisted_state(&self) -> Result<(), String> {
70        let checkpoints_by_task = self
71            .default_workspace_dir
72            .as_deref()
73            .map(load_persisted_checkpoints)
74            .unwrap_or_default()
75            .into_iter()
76            .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
77            .collect::<HashMap<_, _>>();
78        let mut runs = self.supervisor_runs.lock().await;
79        let mut environments = self.environments.lock().await;
80        let mut run_updates = Vec::new();
81        let mut environment_updates = Vec::new();
82        let mut checkpoint_updates = Vec::new();
83        let observer = self.observer.read().await.clone();
84
85        for run in runs.values_mut() {
86            let mut run_changed = false;
87            for task_record in &mut run.tasks {
88                if matches!(task_record.state, SupervisorTaskState::Running) {
89                    let checkpoint = checkpoints_by_task.get(&task_record.task.id);
90                    let blocked_reason = checkpoint
91                        .map(restart_blocked_reason_for_checkpoint)
92                        .unwrap_or_else(|| "execution interrupted during restart".to_string());
93                    task_record.state = SupervisorTaskState::Blocked;
94                    if !task_record
95                        .blocked_reasons
96                        .iter()
97                        .any(|reason| reason == &blocked_reason)
98                    {
99                        task_record.blocked_reasons.push(blocked_reason);
100                    }
101                    task_record.updated_at = Utc::now();
102                    if let Some(checkpoint) = checkpoint {
103                        checkpoint_updates.push(checkpoint_for_restart_recovery(checkpoint));
104                    }
105                    run_changed = true;
106                }
107
108                if let Some(environment) = environments.get_mut(&task_record.environment_id) {
109                    let mut recovery_summary = None;
110                    reconcile_environment_record(environment, Some(task_record));
111                    task_record.environment = environment.summary();
112                    task_record.updated_at = Utc::now();
113                    if task_record.environment.recovery_action.is_some() {
114                        run_changed = true;
115                    }
116                    if let Some(action) = task_record.environment.recovery_action {
117                        recovery_summary = Some((action, recovery_message(environment)));
118                    }
119                    environment_updates.push(environment.clone());
120                    if let (Some(observer), Some((action, summary))) = (&observer, recovery_summary)
121                    {
122                        let observer = observer.clone();
123                        let environment_id = environment.id.clone();
124                        tokio::spawn(async move {
125                            observer
126                                .on_environment_recovery(environment_id, action, summary)
127                                .await;
128                        });
129                    }
130                }
131            }
132
133            if run_changed {
134                run.status = recalculate_run_status(run);
135                run.updated_at = Utc::now();
136                run_updates.push(run.clone());
137            }
138        }
139
140        for environment in environments.values_mut() {
141            let owned = runs.values().any(|run| {
142                run.tasks
143                    .iter()
144                    .any(|task_record| task_record.environment_id == environment.id)
145            });
146            if !owned {
147                environment.health = EnvironmentHealth::Orphaned;
148                environment.recovery_status = RecoveryStatus::Pending;
149                environment.recovery_action = Some(RecoveryAction::QueueCleanup);
150                environment.updated_at = Utc::now();
151                environment_updates.push(environment.clone());
152                if let Some(observer) = &observer {
153                    let observer = observer.clone();
154                    let environment_id = environment.id.clone();
155                    let summary = recovery_message(environment);
156                    tokio::spawn(async move {
157                        observer
158                            .on_environment_recovery(
159                                environment_id,
160                                RecoveryAction::QueueCleanup,
161                                summary,
162                            )
163                            .await;
164                    });
165                }
166            }
167        }
168
169        drop(environments);
170        drop(runs);
171
172        for run in run_updates {
173            self.persist_run_async(&run).await?;
174        }
175        for environment in environment_updates {
176            self.persist_environment_record(&environment).await?;
177        }
178        for checkpoint in checkpoint_updates {
179            self.persist_delegated_checkpoint_async(&checkpoint).await?;
180        }
181
182        Ok(())
183    }
184
185    fn reconcile_persisted_state_sync(&self) -> Result<(), String> {
186        let checkpoints_by_task = self
187            .default_workspace_dir
188            .as_deref()
189            .map(load_persisted_checkpoints)
190            .unwrap_or_default()
191            .into_iter()
192            .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
193            .collect::<HashMap<_, _>>();
194        let Ok(mut runs) = self.supervisor_runs.try_lock() else {
195            return Ok(());
196        };
197        let Ok(mut environments) = self.environments.try_lock() else {
198            return Ok(());
199        };
200
201        let mut run_updates = Vec::new();
202        let mut environment_updates = Vec::new();
203        let mut checkpoint_updates = Vec::new();
204
205        for run in runs.values_mut() {
206            let mut run_changed = false;
207            for task_record in &mut run.tasks {
208                if matches!(task_record.state, SupervisorTaskState::Running) {
209                    let checkpoint = checkpoints_by_task.get(&task_record.task.id);
210                    let blocked_reason = checkpoint
211                        .map(restart_blocked_reason_for_checkpoint)
212                        .unwrap_or_else(|| "execution interrupted during restart".to_string());
213                    task_record.state = SupervisorTaskState::Blocked;
214                    if !task_record
215                        .blocked_reasons
216                        .iter()
217                        .any(|reason| reason == &blocked_reason)
218                    {
219                        task_record.blocked_reasons.push(blocked_reason);
220                    }
221                    task_record.updated_at = Utc::now();
222                    if let Some(checkpoint) = checkpoint {
223                        checkpoint_updates.push(checkpoint_for_restart_recovery(checkpoint));
224                    }
225                    run_changed = true;
226                }
227
228                if let Some(environment) = environments.get_mut(&task_record.environment_id) {
229                    reconcile_environment_record(environment, Some(task_record));
230                    task_record.environment = environment.summary();
231                    task_record.updated_at = Utc::now();
232                    if task_record.environment.recovery_action.is_some() {
233                        run_changed = true;
234                    }
235                    environment_updates.push(environment.clone());
236                }
237            }
238
239            if run_changed {
240                run.status = recalculate_run_status(run);
241                run.updated_at = Utc::now();
242                run_updates.push(run.clone());
243            }
244        }
245
246        for environment in environments.values_mut() {
247            let owned = runs.values().any(|run| {
248                run.tasks
249                    .iter()
250                    .any(|task_record| task_record.environment_id == environment.id)
251            });
252            if !owned {
253                environment.health = EnvironmentHealth::Orphaned;
254                environment.recovery_status = RecoveryStatus::Pending;
255                environment.recovery_action = Some(RecoveryAction::QueueCleanup);
256                environment.updated_at = Utc::now();
257                environment_updates.push(environment.clone());
258            }
259        }
260
261        drop(environments);
262        drop(runs);
263
264        for run in run_updates {
265            self.persist_run(&run)?;
266        }
267        for environment in environment_updates {
268            persist_environment_to_disk(&environment.spec.workspace_root, &environment)?;
269        }
270        for checkpoint in checkpoint_updates {
271            self.persist_delegated_checkpoint(&checkpoint)?;
272        }
273
274        Ok(())
275    }
276
277    fn prepare_resumable_tasks_from_checkpoints_sync(&self) -> Vec<DelegatedTask> {
278        let checkpoints_by_task = self
279            .default_workspace_dir
280            .as_deref()
281            .map(load_persisted_checkpoints)
282            .unwrap_or_default()
283            .into_iter()
284            .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
285            .collect::<HashMap<_, _>>();
286
287        let Ok(mut runs) = self.supervisor_runs.try_lock() else {
288            return Vec::new();
289        };
290
291        let mut tasks_to_resume = Vec::new();
292        let mut run_updates = Vec::new();
293        let mut checkpoint_updates = Vec::new();
294
295        for run in runs.values_mut() {
296            let mut run_changed = false;
297            for task_record in &mut run.tasks {
298                let Some(checkpoint) = checkpoints_by_task.get(&task_record.task.id) else {
299                    continue;
300                };
301                if !matches!(task_record.state, SupervisorTaskState::Blocked)
302                    || checkpoint.result_published
303                    || checkpoint.resume_state.is_none()
304                    || checkpoint.resume_disposition
305                        != DelegatedResumeDisposition::ResumeFromCheckpoint
306                {
307                    continue;
308                }
309
310                let blocked_reason = restart_blocked_reason_for_checkpoint(checkpoint);
311                task_record.state = SupervisorTaskState::Queued;
312                task_record
313                    .blocked_reasons
314                    .retain(|reason| reason != &blocked_reason);
315                task_record.updated_at = Utc::now();
316                tasks_to_resume.push(task_record.task.clone());
317
318                let mut updated_checkpoint = checkpoint.clone();
319                updated_checkpoint.stage = DelegatedCheckpointStage::Queued;
320                updated_checkpoint.note = Some("resume scheduled after restart".to_string());
321                updated_checkpoint.updated_at = Utc::now();
322                checkpoint_updates.push(updated_checkpoint);
323                run_changed = true;
324            }
325
326            if run_changed {
327                refresh_run_rollups(run);
328                run_updates.push(run.clone());
329            }
330        }
331
332        drop(runs);
333
334        for run in run_updates {
335            let _ = self.persist_run(&run);
336        }
337        for checkpoint in checkpoint_updates {
338            let _ = self.persist_delegated_checkpoint(&checkpoint);
339        }
340
341        tasks_to_resume
342    }
343}
344
345fn legacy_environment_record(
346    run_id: &str,
347    session_id: Option<String>,
348    workspace_dir: Option<PathBuf>,
349    task_record: &SupervisorTaskRecord,
350) -> EnvironmentRecord {
351    let now = Utc::now();
352    let environment_id = format!(
353        "env_{}_{}_{}",
354        sanitize_component(run_id),
355        sanitize_component(&task_record.task.id),
356        sanitize_component(&task_record.task.agent_id),
357    );
358
359    EnvironmentRecord {
360        id: environment_id.clone(),
361        spec: EnvironmentSpec {
362            id: environment_id,
363            execution_mode: task_record.environment.execution_mode.clone(),
364            workspace_root: task_record
365                .task
366                .workspace_dir
367                .clone()
368                .or(workspace_dir)
369                .unwrap_or_else(|| task_record.environment.root_dir.clone()),
370            prepared_path: task_record.environment.root_dir.clone(),
371            session_id,
372            run_id: run_id.to_string(),
373            task_id: task_record.task.id.clone(),
374            agent_id: task_record.task.agent_id.clone(),
375            cleanup_policy: default_cleanup_policy(&task_record.environment.execution_mode),
376            write_access: task_record.environment.write_access,
377            git_worktree: task_record
378                .environment
379                .worktree_path
380                .as_ref()
381                .map(|worktree_path| GitWorktreeSpec {
382                    repo_root: task_record
383                        .task
384                        .workspace_dir
385                        .clone()
386                        .unwrap_or_else(|| task_record.environment.root_dir.clone()),
387                    base_branch: task_record
388                        .environment
389                        .branch_name
390                        .clone()
391                        .unwrap_or_else(|| "main".to_string()),
392                    worktree_branch: task_record
393                        .environment
394                        .branch_name
395                        .clone()
396                        .unwrap_or_else(|| "main".to_string()),
397                    worktree_path: worktree_path.clone(),
398                    create_branch_if_missing: true,
399                }),
400            remote_url: task_record.environment.remote_url.clone(),
401        },
402        state: if matches!(task_record.state, SupervisorTaskState::Running) {
403            EnvironmentState::InUse
404        } else {
405            EnvironmentState::Ready
406        },
407        health: task_record.environment.health,
408        prepared_path: task_record.environment.root_dir.clone(),
409        lease: None,
410        cleanup_result: task_record.environment.cleanup_result.clone(),
411        recovery_status: task_record.environment.recovery_status,
412        recovery_action: task_record.environment.recovery_action,
413        failure: task_record.environment.failure.clone(),
414        created_at: task_record.created_at,
415        updated_at: task_record.updated_at,
416        last_verified_at: Some(now),
417        metadata: None,
418    }
419}
420
421fn reconcile_environment_record(
422    environment: &mut EnvironmentRecord,
423    task_record: Option<&mut SupervisorTaskRecord>,
424) {
425    let path_exists = environment.prepared_path.exists();
426    let mut recovery_action = None;
427
428    if let Some(lease) = environment.lease.as_mut()
429        && lease.released_at.is_none()
430    {
431        lease.released_at = Some(Utc::now());
432        recovery_action = Some(RecoveryAction::ReleaseStaleLease);
433    }
434
435    match environment.spec.execution_mode {
436        AgentExecutionMode::GitWorktree => {
437            if !path_exists {
438                environment.health = EnvironmentHealth::Missing;
439                environment.recovery_status = RecoveryStatus::Pending;
440                environment.recovery_action = Some(RecoveryAction::RecreateMissingEnvironment);
441            } else if let Some(spec) = environment.spec.git_worktree.as_ref() {
442                let git = GitTools::new(Some(spec.repo_root.clone()));
443                let worktree_ok = git
444                    .worktree_list()
445                    .map(|worktrees| {
446                        worktrees
447                            .into_iter()
448                            .any(|worktree| worktree.path == spec.worktree_path)
449                    })
450                    .unwrap_or(false);
451                if !worktree_ok {
452                    environment.health = EnvironmentHealth::Drifted;
453                    environment.recovery_status = RecoveryStatus::NeedsOperatorAction;
454                    environment.recovery_action = Some(RecoveryAction::MarkTaskBlocked);
455                } else {
456                    environment.health = git
457                        .is_worktree_clean(&spec.worktree_path)
458                        .map(|clean| {
459                            if clean {
460                                EnvironmentHealth::Clean
461                            } else {
462                                EnvironmentHealth::Dirty
463                            }
464                        })
465                        .unwrap_or(EnvironmentHealth::Unknown);
466                    environment.recovery_status = RecoveryStatus::Reconciled;
467                    environment.recovery_action = recovery_action;
468                    if !matches!(
469                        environment.state,
470                        EnvironmentState::Archived
471                            | EnvironmentState::Removed
472                            | EnvironmentState::Failed
473                    ) {
474                        environment.state = EnvironmentState::Ready;
475                    }
476                }
477            }
478        }
479        AgentExecutionMode::IsolatedWorkspace => {
480            if !path_exists {
481                environment.health = EnvironmentHealth::Missing;
482                environment.recovery_status = RecoveryStatus::Pending;
483                environment.recovery_action = Some(RecoveryAction::RecreateMissingEnvironment);
484            } else {
485                environment.health = EnvironmentHealth::Clean;
486                environment.recovery_status = RecoveryStatus::Reconciled;
487                environment.recovery_action = recovery_action;
488                if !matches!(
489                    environment.state,
490                    EnvironmentState::Archived
491                        | EnvironmentState::Removed
492                        | EnvironmentState::Failed
493                ) {
494                    environment.state = EnvironmentState::Ready;
495                }
496            }
497        }
498        AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
499            environment.health = if path_exists {
500                EnvironmentHealth::Clean
501            } else {
502                EnvironmentHealth::Missing
503            };
504            environment.recovery_status = if path_exists {
505                RecoveryStatus::Reconciled
506            } else {
507                RecoveryStatus::NeedsOperatorAction
508            };
509            environment.recovery_action = recovery_action.or({
510                if path_exists {
511                    None
512                } else {
513                    Some(RecoveryAction::MarkTaskBlocked)
514                }
515            });
516            if path_exists
517                && !matches!(
518                    environment.state,
519                    EnvironmentState::Archived
520                        | EnvironmentState::Removed
521                        | EnvironmentState::Failed
522                )
523            {
524                environment.state = EnvironmentState::Ready;
525            }
526        }
527    }
528
529    if let Some(task_record) = task_record
530        && environment.recovery_action.is_some()
531    {
532        task_record.state = SupervisorTaskState::Blocked;
533        if !task_record
534            .blocked_reasons
535            .iter()
536            .any(|reason| reason == &recovery_message(environment))
537        {
538            task_record
539                .blocked_reasons
540                .push(recovery_message(environment));
541        }
542    }
543
544    environment.updated_at = Utc::now();
545    environment.last_verified_at = Some(Utc::now());
546}
547
548fn recovery_message(environment: &EnvironmentRecord) -> String {
549    match environment.recovery_action {
550        Some(RecoveryAction::RecreateMissingEnvironment) => format!(
551            "environment {} is missing and must be recreated",
552            environment.id
553        ),
554        Some(RecoveryAction::ReleaseStaleLease) => format!(
555            "stale lease released for environment {} after restart",
556            environment.id
557        ),
558        Some(RecoveryAction::ArchiveDirtyEnvironment) => format!(
559            "environment {} is dirty and should be archived",
560            environment.id
561        ),
562        Some(RecoveryAction::QueueCleanup) => format!(
563            "environment {} is orphaned and queued for cleanup",
564            environment.id
565        ),
566        Some(RecoveryAction::MarkTaskBlocked) => format!(
567            "environment {} drifted and blocked the owning task",
568            environment.id
569        ),
570        _ => format!("environment {} reconciled", environment.id),
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use crate::AppConfig;
578    use gestura_core_agents::{AgentManager, AgentRole};
579    use std::process::Command;
580    use tempfile::tempdir;
581
582    fn test_orchestrator(workspace_root: &Path) -> AgentOrchestrator<AgentManager> {
583        AgentOrchestrator::new_with_workspace_root(
584            AgentManager::new(workspace_root.join("recovery-tests.db")),
585            AppConfig::default(),
586            Some(workspace_root.to_path_buf()),
587        )
588    }
589
590    fn test_task(
591        workspace_root: &Path,
592        run_id: &str,
593        task_id: &str,
594        agent_id: &str,
595        execution_mode: AgentExecutionMode,
596    ) -> DelegatedTask {
597        DelegatedTask {
598            id: task_id.to_string(),
599            agent_id: agent_id.to_string(),
600            prompt: format!("Execute {task_id}"),
601            context: None,
602            required_tools: vec![],
603            priority: 1,
604            session_id: Some("session-recovery".to_string()),
605            directive_id: None,
606            tracking_task_id: None,
607            run_id: Some(run_id.to_string()),
608            parent_task_id: None,
609            depends_on: vec![],
610            role: Some(AgentRole::Implementer),
611            delegation_brief: None,
612            planning_only: false,
613            approval_required: false,
614            reviewer_required: false,
615            test_required: false,
616            workspace_dir: Some(workspace_root.to_path_buf()),
617            execution_mode,
618            environment_id: None,
619            remote_target: None,
620            memory_tags: vec!["recovery-test".to_string()],
621            name: Some(task_id.to_string()),
622        }
623    }
624
625    fn test_task_record(
626        task: DelegatedTask,
627        environment: &EnvironmentRecord,
628        state: SupervisorTaskState,
629    ) -> SupervisorTaskRecord {
630        let now = Utc::now();
631        SupervisorTaskRecord {
632            task,
633            state,
634            approval: TaskApprovalRecord::default(),
635            environment_id: environment.id.clone(),
636            environment: environment.summary(),
637            claimed_by: None,
638            attempts: 0,
639            blocked_reasons: vec![],
640            result: None,
641            remote_execution: None,
642            local_execution: None,
643            messages: vec![],
644            checkpoint: None,
645            created_at: now,
646            updated_at: now,
647            started_at: None,
648            completed_at: None,
649        }
650    }
651
652    fn test_checkpoint(task: &DelegatedTask) -> DelegatedTaskCheckpoint {
653        let now = Utc::now();
654        DelegatedTaskCheckpoint {
655            id: delegated_checkpoint_id(&task.id),
656            task_id: task.id.clone(),
657            run_id: task.run_id.clone(),
658            session_id: task.session_id.clone(),
659            agent_id: task.agent_id.clone(),
660            environment_id: task.environment_id.clone(),
661            execution_mode: task.execution_mode.clone(),
662            stage: DelegatedCheckpointStage::Running,
663            replay_safety: DelegatedReplaySafety::CheckpointResumable,
664            resume_disposition: DelegatedResumeDisposition::ResumeFromCheckpoint,
665            safe_boundary_label: "delegated task dispatch boundary".to_string(),
666            workspace_dir: task.workspace_dir.clone(),
667            completed_tool_calls: Vec::new(),
668            result_published: false,
669            note: None,
670            resume_state: None,
671            created_at: now,
672            updated_at: now,
673        }
674    }
675
676    fn test_run(
677        run_id: &str,
678        workspace_root: &Path,
679        task_record: SupervisorTaskRecord,
680    ) -> SupervisorRun {
681        let now = Utc::now();
682        SupervisorRun {
683            id: run_id.to_string(),
684            name: Some(format!("Run {run_id}")),
685            session_id: Some("session-recovery".to_string()),
686            workspace_dir: Some(workspace_root.to_path_buf()),
687            lead_agent_id: Some("supervisor-1".to_string()),
688            parent_run: None,
689            child_runs: vec![],
690            hierarchy_depth: 0,
691            max_hierarchy_depth: default_max_child_supervisor_depth(),
692            inherited_policy: None,
693            status: SupervisorRunStatus::Running,
694            task_summary: SupervisorRunTaskSummary {
695                total: 1,
696                running: 1,
697                ..SupervisorRunTaskSummary::default()
698            },
699            hierarchy_summary: None,
700            tasks: vec![task_record],
701            messages: vec![],
702            shared_cognition: vec![],
703            created_at: now,
704            updated_at: now,
705            completed_at: None,
706            metadata: None,
707        }
708    }
709
710    fn test_environment_record(
711        workspace_root: &Path,
712        prepared_path: PathBuf,
713        execution_mode: AgentExecutionMode,
714        git_worktree: Option<GitWorktreeSpec>,
715    ) -> EnvironmentRecord {
716        let now = Utc::now();
717        EnvironmentRecord {
718            id: format!(
719                "env-{}",
720                sanitize_component(prepared_path.to_string_lossy().as_ref())
721            ),
722            spec: EnvironmentSpec {
723                id: "env-spec".to_string(),
724                execution_mode,
725                workspace_root: workspace_root.to_path_buf(),
726                prepared_path: prepared_path.clone(),
727                session_id: Some("session-recovery".to_string()),
728                run_id: "run-recovery".to_string(),
729                task_id: "task-recovery".to_string(),
730                agent_id: "agent-recovery".to_string(),
731                cleanup_policy: CleanupPolicy::RemoveWhenCleanOtherwiseArchive,
732                write_access: true,
733                git_worktree,
734                remote_url: None,
735            },
736            state: EnvironmentState::Ready,
737            health: EnvironmentHealth::Unknown,
738            prepared_path,
739            lease: None,
740            cleanup_result: None,
741            recovery_status: RecoveryStatus::NotRequired,
742            recovery_action: None,
743            failure: None,
744            created_at: now,
745            updated_at: now,
746            last_verified_at: None,
747            metadata: None,
748        }
749    }
750
751    fn run_git(repo_root: &Path, args: &[&str]) {
752        let status = Command::new("git")
753            .current_dir(repo_root)
754            .args(args)
755            .status()
756            .expect("run git command");
757        assert!(
758            status.success(),
759            "git command failed: git {}",
760            args.join(" ")
761        );
762    }
763
764    fn init_git_repo(repo_root: &Path) {
765        run_git(repo_root, &["init", "--initial-branch=main"]);
766        run_git(
767            repo_root,
768            &["config", "user.email", "gestura-tests@example.com"],
769        );
770        run_git(repo_root, &["config", "user.name", "Gestura Tests"]);
771        std::fs::write(repo_root.join("README.md"), "seed\n").expect("write seed file");
772        run_git(repo_root, &["add", "README.md"]);
773        run_git(repo_root, &["commit", "-m", "Initial commit"]);
774    }
775
776    #[test]
777    fn test_reconcile_environment_record_blocks_missing_shared_workspace() {
778        let temp = tempdir().expect("tempdir");
779        let mut environment = test_environment_record(
780            temp.path(),
781            temp.path().join("missing-shared"),
782            AgentExecutionMode::SharedWorkspace,
783            None,
784        );
785        let task = test_task(
786            temp.path(),
787            "run-shared-missing",
788            "task-shared-missing",
789            "agent-shared",
790            AgentExecutionMode::SharedWorkspace,
791        );
792        let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
793
794        reconcile_environment_record(&mut environment, Some(&mut task_record));
795
796        assert_eq!(environment.health, EnvironmentHealth::Missing);
797        assert_eq!(
798            environment.recovery_status,
799            RecoveryStatus::NeedsOperatorAction
800        );
801        assert_eq!(
802            environment.recovery_action,
803            Some(RecoveryAction::MarkTaskBlocked)
804        );
805        assert_eq!(task_record.state, SupervisorTaskState::Blocked);
806        assert!(
807            task_record
808                .blocked_reasons
809                .iter()
810                .any(|reason| reason.contains("blocked"))
811        );
812    }
813
814    #[test]
815    fn test_reconcile_environment_record_marks_missing_isolated_workspace_for_recreation() {
816        let temp = tempdir().expect("tempdir");
817        let mut environment = test_environment_record(
818            temp.path(),
819            temp.path().join("missing-isolated"),
820            AgentExecutionMode::IsolatedWorkspace,
821            None,
822        );
823        let task = test_task(
824            temp.path(),
825            "run-isolated-missing",
826            "task-isolated-missing",
827            "agent-isolated",
828            AgentExecutionMode::IsolatedWorkspace,
829        );
830        let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
831
832        reconcile_environment_record(&mut environment, Some(&mut task_record));
833
834        assert_eq!(environment.health, EnvironmentHealth::Missing);
835        assert_eq!(environment.recovery_status, RecoveryStatus::Pending);
836        assert_eq!(
837            environment.recovery_action,
838            Some(RecoveryAction::RecreateMissingEnvironment)
839        );
840        assert_eq!(task_record.state, SupervisorTaskState::Blocked);
841        assert!(
842            task_record
843                .blocked_reasons
844                .iter()
845                .any(|reason| reason.contains("must be recreated"))
846        );
847    }
848
849    #[test]
850    fn test_reconcile_environment_record_marks_unregistered_worktree_as_drifted() {
851        let temp = tempdir().expect("tempdir");
852        init_git_repo(temp.path());
853        let drifted_path = temp.path().join(".gestura").join("drifted-worktree");
854        std::fs::create_dir_all(&drifted_path).expect("create drifted worktree path");
855        let git_worktree = GitWorktreeSpec {
856            repo_root: temp.path().to_path_buf(),
857            base_branch: "main".to_string(),
858            worktree_branch: "gestura/session-recovery/run/agent/task".to_string(),
859            worktree_path: drifted_path.clone(),
860            create_branch_if_missing: true,
861        };
862        let mut environment = test_environment_record(
863            temp.path(),
864            drifted_path,
865            AgentExecutionMode::GitWorktree,
866            Some(git_worktree),
867        );
868        let task = test_task(
869            temp.path(),
870            "run-worktree-drifted",
871            "task-worktree-drifted",
872            "agent-worktree",
873            AgentExecutionMode::GitWorktree,
874        );
875        let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
876
877        reconcile_environment_record(&mut environment, Some(&mut task_record));
878
879        assert_eq!(environment.health, EnvironmentHealth::Drifted);
880        assert_eq!(
881            environment.recovery_status,
882            RecoveryStatus::NeedsOperatorAction
883        );
884        assert_eq!(
885            environment.recovery_action,
886            Some(RecoveryAction::MarkTaskBlocked)
887        );
888        assert_eq!(task_record.state, SupervisorTaskState::Blocked);
889        assert!(
890            task_record
891                .blocked_reasons
892                .iter()
893                .any(|reason| reason.contains("drifted"))
894        );
895    }
896
897    #[tokio::test]
898    async fn test_bootstrap_persisted_state_reconciles_restart_and_orphaned_environments() {
899        let temp = tempdir().expect("tempdir");
900        let orchestrator = test_orchestrator(temp.path());
901
902        let task = test_task(
903            temp.path(),
904            "run-restart",
905            "task-restart",
906            "agent-restart",
907            AgentExecutionMode::IsolatedWorkspace,
908        );
909        let environment = orchestrator
910            .prepare_environment(&task)
911            .await
912            .expect("prepare restart environment");
913        let leased = orchestrator
914            .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
915            .await
916            .expect("acquire environment lease");
917
918        let run = test_run(
919            "run-restart",
920            temp.path(),
921            test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
922        );
923        orchestrator.persist_run(&run).expect("persist run");
924
925        let orphan = orchestrator
926            .prepare_environment(&test_task(
927                temp.path(),
928                "run-orphan",
929                "task-orphan",
930                "agent-orphan",
931                AgentExecutionMode::IsolatedWorkspace,
932            ))
933            .await
934            .expect("prepare orphan environment");
935
936        let recovered = test_orchestrator(temp.path());
937        let environments = recovered.environments.lock().await;
938        let recovered_environment = environments
939            .get(&leased.id)
940            .expect("recovered environment should exist");
941        let orphan_environment = environments
942            .get(&orphan.id)
943            .expect("orphan environment should exist");
944
945        assert_eq!(
946            recovered_environment.recovery_status,
947            RecoveryStatus::Reconciled
948        );
949        assert_eq!(
950            recovered_environment.recovery_action,
951            Some(RecoveryAction::ReleaseStaleLease)
952        );
953        assert_eq!(recovered_environment.state, EnvironmentState::Ready);
954        assert!(
955            recovered_environment
956                .lease
957                .as_ref()
958                .and_then(|lease| lease.released_at)
959                .is_some()
960        );
961
962        assert_eq!(orphan_environment.health, EnvironmentHealth::Orphaned);
963        assert_eq!(orphan_environment.recovery_status, RecoveryStatus::Pending);
964        assert_eq!(
965            orphan_environment.recovery_action,
966            Some(RecoveryAction::QueueCleanup)
967        );
968        drop(environments);
969
970        let runs = recovered.supervisor_runs.lock().await;
971        let run = runs.get("run-restart").expect("recovered run should exist");
972        let record = run.tasks.first().expect("recovered task should exist");
973        assert_eq!(record.state, SupervisorTaskState::Blocked);
974        assert!(
975            record
976                .blocked_reasons
977                .iter()
978                .any(|reason| reason == "execution interrupted during restart")
979        );
980        assert!(
981            record
982                .blocked_reasons
983                .iter()
984                .any(|reason| reason.contains("stale lease released"))
985        );
986    }
987
988    #[tokio::test]
989    async fn test_bootstrap_persisted_state_uses_checkpoint_metadata_for_restart_reason() {
990        let temp = tempdir().expect("tempdir");
991        let orchestrator = test_orchestrator(temp.path());
992
993        let task = test_task(
994            temp.path(),
995            "run-resume",
996            "task-resume",
997            "agent-resume",
998            AgentExecutionMode::IsolatedWorkspace,
999        );
1000        let environment = orchestrator
1001            .prepare_environment(&task)
1002            .await
1003            .expect("prepare resume environment");
1004        let leased = orchestrator
1005            .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
1006            .await
1007            .expect("acquire environment lease");
1008
1009        let run = test_run(
1010            "run-resume",
1011            temp.path(),
1012            test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
1013        );
1014        orchestrator.persist_run(&run).expect("persist run");
1015        persist_checkpoint_to_disk(temp.path(), &test_checkpoint(&task))
1016            .expect("persist checkpoint");
1017
1018        let recovered = test_orchestrator(temp.path());
1019
1020        let runs = recovered.supervisor_runs.lock().await;
1021        let run = runs.get("run-resume").expect("recovered run should exist");
1022        let record = run.tasks.first().expect("recovered task should exist");
1023        assert_eq!(record.state, SupervisorTaskState::Blocked);
1024        assert!(record.blocked_reasons.iter().any(|reason| {
1025            reason.contains("can resume from checkpoint")
1026                && reason.contains("delegated task dispatch boundary")
1027        }));
1028        drop(runs);
1029
1030        let checkpoints = load_persisted_checkpoints(temp.path());
1031        let checkpoint = checkpoints
1032            .into_iter()
1033            .find(|checkpoint| checkpoint.task_id == task.id)
1034            .expect("checkpoint should persist");
1035        assert_eq!(checkpoint.stage, DelegatedCheckpointStage::Blocked);
1036        assert_eq!(
1037            checkpoint.resume_disposition,
1038            DelegatedResumeDisposition::ResumeFromCheckpoint
1039        );
1040        assert_eq!(
1041            checkpoint.note.as_deref(),
1042            Some("execution interrupted during restart")
1043        );
1044    }
1045
1046    #[tokio::test]
1047    async fn test_bootstrap_persisted_state_keeps_operator_gated_checkpoint_blocked() {
1048        let temp = tempdir().expect("tempdir");
1049        let orchestrator = test_orchestrator(temp.path());
1050
1051        let task = test_task(
1052            temp.path(),
1053            "run-operator-gated",
1054            "task-operator-gated",
1055            "agent-operator-gated",
1056            AgentExecutionMode::IsolatedWorkspace,
1057        );
1058        let environment = orchestrator
1059            .prepare_environment(&task)
1060            .await
1061            .expect("prepare operator-gated environment");
1062        let leased = orchestrator
1063            .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
1064            .await
1065            .expect("acquire environment lease");
1066
1067        let run = test_run(
1068            "run-operator-gated",
1069            temp.path(),
1070            test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
1071        );
1072        orchestrator.persist_run(&run).expect("persist run");
1073
1074        let mut checkpoint = test_checkpoint(&task);
1075        checkpoint.replay_safety = DelegatedReplaySafety::OperatorGated;
1076        checkpoint.resume_disposition = DelegatedResumeDisposition::OperatorInterventionRequired;
1077        checkpoint.safe_boundary_label = "before tool 'shell' execution".to_string();
1078        checkpoint.note = Some("awaiting operator review after restart".to_string());
1079        persist_checkpoint_to_disk(temp.path(), &checkpoint).expect("persist checkpoint");
1080
1081        let recovered = test_orchestrator(temp.path());
1082        let run = recovered
1083            .get_supervisor_run("run-operator-gated")
1084            .await
1085            .expect("recovered run should exist");
1086        let record = run.tasks.first().expect("recovered task should exist");
1087        assert_eq!(record.state, SupervisorTaskState::Blocked);
1088        assert!(record.blocked_reasons.iter().any(|reason| {
1089            reason.contains("operator action required")
1090                && reason.contains("before tool 'shell' execution")
1091        }));
1092
1093        let checkpoint_summary = record
1094            .checkpoint
1095            .as_ref()
1096            .expect("checkpoint summary should be attached after reload");
1097        assert_eq!(checkpoint_summary.stage, DelegatedCheckpointStage::Blocked);
1098        assert_eq!(
1099            checkpoint_summary.replay_safety,
1100            DelegatedReplaySafety::OperatorGated
1101        );
1102        assert_eq!(
1103            checkpoint_summary.resume_disposition,
1104            DelegatedResumeDisposition::OperatorInterventionRequired
1105        );
1106        assert_eq!(
1107            checkpoint_summary.safe_boundary_label,
1108            "before tool 'shell' execution"
1109        );
1110        assert!(!checkpoint_summary.has_resume_state);
1111        assert!(
1112            !checkpoint_summary
1113                .available_actions
1114                .contains(&DelegatedCheckpointAction::ResumeFromCheckpoint)
1115        );
1116        assert!(
1117            checkpoint_summary
1118                .available_actions
1119                .contains(&DelegatedCheckpointAction::RestartFromScratch)
1120        );
1121        assert!(
1122            checkpoint_summary
1123                .available_actions
1124                .contains(&DelegatedCheckpointAction::AcknowledgeBlocked)
1125        );
1126    }
1127
1128    #[test]
1129    fn test_prepare_resumable_tasks_from_checkpoints_sync_marks_safe_tasks_pending() {
1130        let temp = tempdir().expect("tempdir");
1131        let orchestrator = test_orchestrator(temp.path());
1132
1133        let task = test_task(
1134            temp.path(),
1135            "run-safe-resume",
1136            "task-safe-resume",
1137            "agent-safe-resume",
1138            AgentExecutionMode::IsolatedWorkspace,
1139        );
1140        let environment = test_environment_record(
1141            temp.path(),
1142            task.workspace_dir.as_ref().expect("workspace").clone(),
1143            AgentExecutionMode::IsolatedWorkspace,
1144            None,
1145        );
1146        persist_environment_to_disk(temp.path(), &environment).expect("persist environment");
1147
1148        let mut checkpoint = test_checkpoint(&task);
1149        checkpoint.stage = DelegatedCheckpointStage::Blocked;
1150        checkpoint.note = Some("execution interrupted during restart".to_string());
1151        checkpoint.resume_state = Some(build_delegated_resume_state(
1152            &task,
1153            "delegated prompt",
1154            "partial answer",
1155            "",
1156            &[],
1157            0,
1158        ));
1159        persist_checkpoint_to_disk(temp.path(), &checkpoint).expect("persist checkpoint");
1160
1161        let run = test_run(
1162            "run-safe-resume",
1163            temp.path(),
1164            test_task_record(task.clone(), &environment, SupervisorTaskState::Blocked),
1165        );
1166        orchestrator.persist_run(&run).expect("persist run");
1167        orchestrator
1168            .supervisor_runs
1169            .try_lock()
1170            .expect("runs should be lockable")
1171            .insert(run.id.clone(), run);
1172
1173        let resumable = orchestrator.prepare_resumable_tasks_from_checkpoints_sync();
1174        assert_eq!(resumable.len(), 1);
1175        assert_eq!(resumable[0].id, task.id);
1176
1177        let runs = orchestrator
1178            .supervisor_runs
1179            .try_lock()
1180            .expect("runs should be lockable after sync prep");
1181        let run = runs
1182            .get("run-safe-resume")
1183            .expect("recovered run should exist");
1184        let record = run.tasks.first().expect("task record should exist");
1185        assert_eq!(record.state, SupervisorTaskState::Queued);
1186        assert!(record.blocked_reasons.is_empty());
1187    }
1188}