1use super::environment::{default_cleanup_policy, sanitize_component};
2use super::*;
3use gestura_core_agents::AgentExecutionMode;
4use gestura_core_tools::git::GitTools;
5
6impl<M: OrchestratorAgentManager> AgentOrchestrator<M> {
7 pub(crate) fn bootstrap_persisted_state(&self) {
8 let Some(root) = self.default_workspace_dir.as_ref() else {
9 return;
10 };
11
12 let mut runs = load_persisted_runs(root);
13 let mut environments: HashMap<String, EnvironmentRecord> =
14 load_persisted_environments(root)
15 .into_iter()
16 .map(|environment| (environment.id.clone(), environment))
17 .collect();
18
19 let mut generated = Vec::new();
20 for run in &mut runs {
21 let run_id = run.id.clone();
22 let session_id = run.session_id.clone();
23 let workspace_dir = run.workspace_dir.clone();
24 for task_record in &mut run.tasks {
25 if task_record.environment_id.is_empty() {
26 let record = legacy_environment_record(
27 &run_id,
28 session_id.clone(),
29 workspace_dir.clone(),
30 task_record,
31 );
32 task_record.environment_id = record.id.clone();
33 task_record.environment = record.summary();
34 generated.push(record);
35 }
36 }
37 }
38 for record in generated {
39 environments.entry(record.id.clone()).or_insert(record);
40 }
41
42 let mut index = HashMap::new();
43 for run in &runs {
44 for task_record in &run.tasks {
45 index.insert(task_record.task.id.clone(), run.id.clone());
46 }
47 }
48
49 if let Ok(mut guard) = self.supervisor_runs.try_lock() {
50 *guard = runs.into_iter().map(|run| (run.id.clone(), run)).collect();
51 }
52 if let Ok(mut guard) = self.environments.try_lock() {
53 *guard = environments;
54 }
55 if let Ok(mut guard) = self.task_run_index.try_lock() {
56 *guard = index;
57 }
58
59 let _ = self.reconcile_persisted_state_sync();
60 for task in self.prepare_resumable_tasks_from_checkpoints_sync() {
61 self.schedule_task_execution(task);
62 }
63 }
64
65 pub async fn reconcile_orchestrator_state(&self) -> Result<(), String> {
66 self.reconcile_persisted_state().await
67 }
68
69 async fn reconcile_persisted_state(&self) -> Result<(), String> {
70 let checkpoints_by_task = self
71 .default_workspace_dir
72 .as_deref()
73 .map(load_persisted_checkpoints)
74 .unwrap_or_default()
75 .into_iter()
76 .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
77 .collect::<HashMap<_, _>>();
78 let mut runs = self.supervisor_runs.lock().await;
79 let mut environments = self.environments.lock().await;
80 let mut run_updates = Vec::new();
81 let mut environment_updates = Vec::new();
82 let mut checkpoint_updates = Vec::new();
83 let observer = self.observer.read().await.clone();
84
85 for run in runs.values_mut() {
86 let mut run_changed = false;
87 for task_record in &mut run.tasks {
88 if matches!(task_record.state, SupervisorTaskState::Running) {
89 let checkpoint = checkpoints_by_task.get(&task_record.task.id);
90 let blocked_reason = checkpoint
91 .map(restart_blocked_reason_for_checkpoint)
92 .unwrap_or_else(|| "execution interrupted during restart".to_string());
93 task_record.state = SupervisorTaskState::Blocked;
94 if !task_record
95 .blocked_reasons
96 .iter()
97 .any(|reason| reason == &blocked_reason)
98 {
99 task_record.blocked_reasons.push(blocked_reason);
100 }
101 task_record.updated_at = Utc::now();
102 if let Some(checkpoint) = checkpoint {
103 checkpoint_updates.push(checkpoint_for_restart_recovery(checkpoint));
104 }
105 run_changed = true;
106 }
107
108 if let Some(environment) = environments.get_mut(&task_record.environment_id) {
109 let mut recovery_summary = None;
110 reconcile_environment_record(environment, Some(task_record));
111 task_record.environment = environment.summary();
112 task_record.updated_at = Utc::now();
113 if task_record.environment.recovery_action.is_some() {
114 run_changed = true;
115 }
116 if let Some(action) = task_record.environment.recovery_action {
117 recovery_summary = Some((action, recovery_message(environment)));
118 }
119 environment_updates.push(environment.clone());
120 if let (Some(observer), Some((action, summary))) = (&observer, recovery_summary)
121 {
122 let observer = observer.clone();
123 let environment_id = environment.id.clone();
124 tokio::spawn(async move {
125 observer
126 .on_environment_recovery(environment_id, action, summary)
127 .await;
128 });
129 }
130 }
131 }
132
133 if run_changed {
134 run.status = recalculate_run_status(run);
135 run.updated_at = Utc::now();
136 run_updates.push(run.clone());
137 }
138 }
139
140 for environment in environments.values_mut() {
141 let owned = runs.values().any(|run| {
142 run.tasks
143 .iter()
144 .any(|task_record| task_record.environment_id == environment.id)
145 });
146 if !owned {
147 environment.health = EnvironmentHealth::Orphaned;
148 environment.recovery_status = RecoveryStatus::Pending;
149 environment.recovery_action = Some(RecoveryAction::QueueCleanup);
150 environment.updated_at = Utc::now();
151 environment_updates.push(environment.clone());
152 if let Some(observer) = &observer {
153 let observer = observer.clone();
154 let environment_id = environment.id.clone();
155 let summary = recovery_message(environment);
156 tokio::spawn(async move {
157 observer
158 .on_environment_recovery(
159 environment_id,
160 RecoveryAction::QueueCleanup,
161 summary,
162 )
163 .await;
164 });
165 }
166 }
167 }
168
169 drop(environments);
170 drop(runs);
171
172 for run in run_updates {
173 self.persist_run_async(&run).await?;
174 }
175 for environment in environment_updates {
176 self.persist_environment_record(&environment).await?;
177 }
178 for checkpoint in checkpoint_updates {
179 self.persist_delegated_checkpoint_async(&checkpoint).await?;
180 }
181
182 Ok(())
183 }
184
185 fn reconcile_persisted_state_sync(&self) -> Result<(), String> {
186 let checkpoints_by_task = self
187 .default_workspace_dir
188 .as_deref()
189 .map(load_persisted_checkpoints)
190 .unwrap_or_default()
191 .into_iter()
192 .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
193 .collect::<HashMap<_, _>>();
194 let Ok(mut runs) = self.supervisor_runs.try_lock() else {
195 return Ok(());
196 };
197 let Ok(mut environments) = self.environments.try_lock() else {
198 return Ok(());
199 };
200
201 let mut run_updates = Vec::new();
202 let mut environment_updates = Vec::new();
203 let mut checkpoint_updates = Vec::new();
204
205 for run in runs.values_mut() {
206 let mut run_changed = false;
207 for task_record in &mut run.tasks {
208 if matches!(task_record.state, SupervisorTaskState::Running) {
209 let checkpoint = checkpoints_by_task.get(&task_record.task.id);
210 let blocked_reason = checkpoint
211 .map(restart_blocked_reason_for_checkpoint)
212 .unwrap_or_else(|| "execution interrupted during restart".to_string());
213 task_record.state = SupervisorTaskState::Blocked;
214 if !task_record
215 .blocked_reasons
216 .iter()
217 .any(|reason| reason == &blocked_reason)
218 {
219 task_record.blocked_reasons.push(blocked_reason);
220 }
221 task_record.updated_at = Utc::now();
222 if let Some(checkpoint) = checkpoint {
223 checkpoint_updates.push(checkpoint_for_restart_recovery(checkpoint));
224 }
225 run_changed = true;
226 }
227
228 if let Some(environment) = environments.get_mut(&task_record.environment_id) {
229 reconcile_environment_record(environment, Some(task_record));
230 task_record.environment = environment.summary();
231 task_record.updated_at = Utc::now();
232 if task_record.environment.recovery_action.is_some() {
233 run_changed = true;
234 }
235 environment_updates.push(environment.clone());
236 }
237 }
238
239 if run_changed {
240 run.status = recalculate_run_status(run);
241 run.updated_at = Utc::now();
242 run_updates.push(run.clone());
243 }
244 }
245
246 for environment in environments.values_mut() {
247 let owned = runs.values().any(|run| {
248 run.tasks
249 .iter()
250 .any(|task_record| task_record.environment_id == environment.id)
251 });
252 if !owned {
253 environment.health = EnvironmentHealth::Orphaned;
254 environment.recovery_status = RecoveryStatus::Pending;
255 environment.recovery_action = Some(RecoveryAction::QueueCleanup);
256 environment.updated_at = Utc::now();
257 environment_updates.push(environment.clone());
258 }
259 }
260
261 drop(environments);
262 drop(runs);
263
264 for run in run_updates {
265 self.persist_run(&run)?;
266 }
267 for environment in environment_updates {
268 persist_environment_to_disk(&environment.spec.workspace_root, &environment)?;
269 }
270 for checkpoint in checkpoint_updates {
271 self.persist_delegated_checkpoint(&checkpoint)?;
272 }
273
274 Ok(())
275 }
276
277 fn prepare_resumable_tasks_from_checkpoints_sync(&self) -> Vec<DelegatedTask> {
278 let checkpoints_by_task = self
279 .default_workspace_dir
280 .as_deref()
281 .map(load_persisted_checkpoints)
282 .unwrap_or_default()
283 .into_iter()
284 .map(|checkpoint| (checkpoint.task_id.clone(), checkpoint))
285 .collect::<HashMap<_, _>>();
286
287 let Ok(mut runs) = self.supervisor_runs.try_lock() else {
288 return Vec::new();
289 };
290
291 let mut tasks_to_resume = Vec::new();
292 let mut run_updates = Vec::new();
293 let mut checkpoint_updates = Vec::new();
294
295 for run in runs.values_mut() {
296 let mut run_changed = false;
297 for task_record in &mut run.tasks {
298 let Some(checkpoint) = checkpoints_by_task.get(&task_record.task.id) else {
299 continue;
300 };
301 if !matches!(task_record.state, SupervisorTaskState::Blocked)
302 || checkpoint.result_published
303 || checkpoint.resume_state.is_none()
304 || checkpoint.resume_disposition
305 != DelegatedResumeDisposition::ResumeFromCheckpoint
306 {
307 continue;
308 }
309
310 let blocked_reason = restart_blocked_reason_for_checkpoint(checkpoint);
311 task_record.state = SupervisorTaskState::Queued;
312 task_record
313 .blocked_reasons
314 .retain(|reason| reason != &blocked_reason);
315 task_record.updated_at = Utc::now();
316 tasks_to_resume.push(task_record.task.clone());
317
318 let mut updated_checkpoint = checkpoint.clone();
319 updated_checkpoint.stage = DelegatedCheckpointStage::Queued;
320 updated_checkpoint.note = Some("resume scheduled after restart".to_string());
321 updated_checkpoint.updated_at = Utc::now();
322 checkpoint_updates.push(updated_checkpoint);
323 run_changed = true;
324 }
325
326 if run_changed {
327 refresh_run_rollups(run);
328 run_updates.push(run.clone());
329 }
330 }
331
332 drop(runs);
333
334 for run in run_updates {
335 let _ = self.persist_run(&run);
336 }
337 for checkpoint in checkpoint_updates {
338 let _ = self.persist_delegated_checkpoint(&checkpoint);
339 }
340
341 tasks_to_resume
342 }
343}
344
345fn legacy_environment_record(
346 run_id: &str,
347 session_id: Option<String>,
348 workspace_dir: Option<PathBuf>,
349 task_record: &SupervisorTaskRecord,
350) -> EnvironmentRecord {
351 let now = Utc::now();
352 let environment_id = format!(
353 "env_{}_{}_{}",
354 sanitize_component(run_id),
355 sanitize_component(&task_record.task.id),
356 sanitize_component(&task_record.task.agent_id),
357 );
358
359 EnvironmentRecord {
360 id: environment_id.clone(),
361 spec: EnvironmentSpec {
362 id: environment_id,
363 execution_mode: task_record.environment.execution_mode.clone(),
364 workspace_root: task_record
365 .task
366 .workspace_dir
367 .clone()
368 .or(workspace_dir)
369 .unwrap_or_else(|| task_record.environment.root_dir.clone()),
370 prepared_path: task_record.environment.root_dir.clone(),
371 session_id,
372 run_id: run_id.to_string(),
373 task_id: task_record.task.id.clone(),
374 agent_id: task_record.task.agent_id.clone(),
375 cleanup_policy: default_cleanup_policy(&task_record.environment.execution_mode),
376 write_access: task_record.environment.write_access,
377 git_worktree: task_record
378 .environment
379 .worktree_path
380 .as_ref()
381 .map(|worktree_path| GitWorktreeSpec {
382 repo_root: task_record
383 .task
384 .workspace_dir
385 .clone()
386 .unwrap_or_else(|| task_record.environment.root_dir.clone()),
387 base_branch: task_record
388 .environment
389 .branch_name
390 .clone()
391 .unwrap_or_else(|| "main".to_string()),
392 worktree_branch: task_record
393 .environment
394 .branch_name
395 .clone()
396 .unwrap_or_else(|| "main".to_string()),
397 worktree_path: worktree_path.clone(),
398 create_branch_if_missing: true,
399 }),
400 remote_url: task_record.environment.remote_url.clone(),
401 },
402 state: if matches!(task_record.state, SupervisorTaskState::Running) {
403 EnvironmentState::InUse
404 } else {
405 EnvironmentState::Ready
406 },
407 health: task_record.environment.health,
408 prepared_path: task_record.environment.root_dir.clone(),
409 lease: None,
410 cleanup_result: task_record.environment.cleanup_result.clone(),
411 recovery_status: task_record.environment.recovery_status,
412 recovery_action: task_record.environment.recovery_action,
413 failure: task_record.environment.failure.clone(),
414 created_at: task_record.created_at,
415 updated_at: task_record.updated_at,
416 last_verified_at: Some(now),
417 metadata: None,
418 }
419}
420
421fn reconcile_environment_record(
422 environment: &mut EnvironmentRecord,
423 task_record: Option<&mut SupervisorTaskRecord>,
424) {
425 let path_exists = environment.prepared_path.exists();
426 let mut recovery_action = None;
427
428 if let Some(lease) = environment.lease.as_mut()
429 && lease.released_at.is_none()
430 {
431 lease.released_at = Some(Utc::now());
432 recovery_action = Some(RecoveryAction::ReleaseStaleLease);
433 }
434
435 match environment.spec.execution_mode {
436 AgentExecutionMode::GitWorktree => {
437 if !path_exists {
438 environment.health = EnvironmentHealth::Missing;
439 environment.recovery_status = RecoveryStatus::Pending;
440 environment.recovery_action = Some(RecoveryAction::RecreateMissingEnvironment);
441 } else if let Some(spec) = environment.spec.git_worktree.as_ref() {
442 let git = GitTools::new(Some(spec.repo_root.clone()));
443 let worktree_ok = git
444 .worktree_list()
445 .map(|worktrees| {
446 worktrees
447 .into_iter()
448 .any(|worktree| worktree.path == spec.worktree_path)
449 })
450 .unwrap_or(false);
451 if !worktree_ok {
452 environment.health = EnvironmentHealth::Drifted;
453 environment.recovery_status = RecoveryStatus::NeedsOperatorAction;
454 environment.recovery_action = Some(RecoveryAction::MarkTaskBlocked);
455 } else {
456 environment.health = git
457 .is_worktree_clean(&spec.worktree_path)
458 .map(|clean| {
459 if clean {
460 EnvironmentHealth::Clean
461 } else {
462 EnvironmentHealth::Dirty
463 }
464 })
465 .unwrap_or(EnvironmentHealth::Unknown);
466 environment.recovery_status = RecoveryStatus::Reconciled;
467 environment.recovery_action = recovery_action;
468 if !matches!(
469 environment.state,
470 EnvironmentState::Archived
471 | EnvironmentState::Removed
472 | EnvironmentState::Failed
473 ) {
474 environment.state = EnvironmentState::Ready;
475 }
476 }
477 }
478 }
479 AgentExecutionMode::IsolatedWorkspace => {
480 if !path_exists {
481 environment.health = EnvironmentHealth::Missing;
482 environment.recovery_status = RecoveryStatus::Pending;
483 environment.recovery_action = Some(RecoveryAction::RecreateMissingEnvironment);
484 } else {
485 environment.health = EnvironmentHealth::Clean;
486 environment.recovery_status = RecoveryStatus::Reconciled;
487 environment.recovery_action = recovery_action;
488 if !matches!(
489 environment.state,
490 EnvironmentState::Archived
491 | EnvironmentState::Removed
492 | EnvironmentState::Failed
493 ) {
494 environment.state = EnvironmentState::Ready;
495 }
496 }
497 }
498 AgentExecutionMode::SharedWorkspace | AgentExecutionMode::Remote => {
499 environment.health = if path_exists {
500 EnvironmentHealth::Clean
501 } else {
502 EnvironmentHealth::Missing
503 };
504 environment.recovery_status = if path_exists {
505 RecoveryStatus::Reconciled
506 } else {
507 RecoveryStatus::NeedsOperatorAction
508 };
509 environment.recovery_action = recovery_action.or({
510 if path_exists {
511 None
512 } else {
513 Some(RecoveryAction::MarkTaskBlocked)
514 }
515 });
516 if path_exists
517 && !matches!(
518 environment.state,
519 EnvironmentState::Archived
520 | EnvironmentState::Removed
521 | EnvironmentState::Failed
522 )
523 {
524 environment.state = EnvironmentState::Ready;
525 }
526 }
527 }
528
529 if let Some(task_record) = task_record
530 && environment.recovery_action.is_some()
531 {
532 task_record.state = SupervisorTaskState::Blocked;
533 if !task_record
534 .blocked_reasons
535 .iter()
536 .any(|reason| reason == &recovery_message(environment))
537 {
538 task_record
539 .blocked_reasons
540 .push(recovery_message(environment));
541 }
542 }
543
544 environment.updated_at = Utc::now();
545 environment.last_verified_at = Some(Utc::now());
546}
547
548fn recovery_message(environment: &EnvironmentRecord) -> String {
549 match environment.recovery_action {
550 Some(RecoveryAction::RecreateMissingEnvironment) => format!(
551 "environment {} is missing and must be recreated",
552 environment.id
553 ),
554 Some(RecoveryAction::ReleaseStaleLease) => format!(
555 "stale lease released for environment {} after restart",
556 environment.id
557 ),
558 Some(RecoveryAction::ArchiveDirtyEnvironment) => format!(
559 "environment {} is dirty and should be archived",
560 environment.id
561 ),
562 Some(RecoveryAction::QueueCleanup) => format!(
563 "environment {} is orphaned and queued for cleanup",
564 environment.id
565 ),
566 Some(RecoveryAction::MarkTaskBlocked) => format!(
567 "environment {} drifted and blocked the owning task",
568 environment.id
569 ),
570 _ => format!("environment {} reconciled", environment.id),
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::AppConfig;
578 use gestura_core_agents::{AgentManager, AgentRole};
579 use std::process::Command;
580 use tempfile::tempdir;
581
582 fn test_orchestrator(workspace_root: &Path) -> AgentOrchestrator<AgentManager> {
583 AgentOrchestrator::new_with_workspace_root(
584 AgentManager::new(workspace_root.join("recovery-tests.db")),
585 AppConfig::default(),
586 Some(workspace_root.to_path_buf()),
587 )
588 }
589
590 fn test_task(
591 workspace_root: &Path,
592 run_id: &str,
593 task_id: &str,
594 agent_id: &str,
595 execution_mode: AgentExecutionMode,
596 ) -> DelegatedTask {
597 DelegatedTask {
598 id: task_id.to_string(),
599 agent_id: agent_id.to_string(),
600 prompt: format!("Execute {task_id}"),
601 context: None,
602 required_tools: vec![],
603 priority: 1,
604 session_id: Some("session-recovery".to_string()),
605 directive_id: None,
606 tracking_task_id: None,
607 run_id: Some(run_id.to_string()),
608 parent_task_id: None,
609 depends_on: vec![],
610 role: Some(AgentRole::Implementer),
611 delegation_brief: None,
612 planning_only: false,
613 approval_required: false,
614 reviewer_required: false,
615 test_required: false,
616 workspace_dir: Some(workspace_root.to_path_buf()),
617 execution_mode,
618 environment_id: None,
619 remote_target: None,
620 memory_tags: vec!["recovery-test".to_string()],
621 name: Some(task_id.to_string()),
622 }
623 }
624
625 fn test_task_record(
626 task: DelegatedTask,
627 environment: &EnvironmentRecord,
628 state: SupervisorTaskState,
629 ) -> SupervisorTaskRecord {
630 let now = Utc::now();
631 SupervisorTaskRecord {
632 task,
633 state,
634 approval: TaskApprovalRecord::default(),
635 environment_id: environment.id.clone(),
636 environment: environment.summary(),
637 claimed_by: None,
638 attempts: 0,
639 blocked_reasons: vec![],
640 result: None,
641 remote_execution: None,
642 local_execution: None,
643 messages: vec![],
644 checkpoint: None,
645 created_at: now,
646 updated_at: now,
647 started_at: None,
648 completed_at: None,
649 }
650 }
651
652 fn test_checkpoint(task: &DelegatedTask) -> DelegatedTaskCheckpoint {
653 let now = Utc::now();
654 DelegatedTaskCheckpoint {
655 id: delegated_checkpoint_id(&task.id),
656 task_id: task.id.clone(),
657 run_id: task.run_id.clone(),
658 session_id: task.session_id.clone(),
659 agent_id: task.agent_id.clone(),
660 environment_id: task.environment_id.clone(),
661 execution_mode: task.execution_mode.clone(),
662 stage: DelegatedCheckpointStage::Running,
663 replay_safety: DelegatedReplaySafety::CheckpointResumable,
664 resume_disposition: DelegatedResumeDisposition::ResumeFromCheckpoint,
665 safe_boundary_label: "delegated task dispatch boundary".to_string(),
666 workspace_dir: task.workspace_dir.clone(),
667 completed_tool_calls: Vec::new(),
668 result_published: false,
669 note: None,
670 resume_state: None,
671 created_at: now,
672 updated_at: now,
673 }
674 }
675
676 fn test_run(
677 run_id: &str,
678 workspace_root: &Path,
679 task_record: SupervisorTaskRecord,
680 ) -> SupervisorRun {
681 let now = Utc::now();
682 SupervisorRun {
683 id: run_id.to_string(),
684 name: Some(format!("Run {run_id}")),
685 session_id: Some("session-recovery".to_string()),
686 workspace_dir: Some(workspace_root.to_path_buf()),
687 lead_agent_id: Some("supervisor-1".to_string()),
688 parent_run: None,
689 child_runs: vec![],
690 hierarchy_depth: 0,
691 max_hierarchy_depth: default_max_child_supervisor_depth(),
692 inherited_policy: None,
693 status: SupervisorRunStatus::Running,
694 task_summary: SupervisorRunTaskSummary {
695 total: 1,
696 running: 1,
697 ..SupervisorRunTaskSummary::default()
698 },
699 hierarchy_summary: None,
700 tasks: vec![task_record],
701 messages: vec![],
702 shared_cognition: vec![],
703 created_at: now,
704 updated_at: now,
705 completed_at: None,
706 metadata: None,
707 }
708 }
709
710 fn test_environment_record(
711 workspace_root: &Path,
712 prepared_path: PathBuf,
713 execution_mode: AgentExecutionMode,
714 git_worktree: Option<GitWorktreeSpec>,
715 ) -> EnvironmentRecord {
716 let now = Utc::now();
717 EnvironmentRecord {
718 id: format!(
719 "env-{}",
720 sanitize_component(prepared_path.to_string_lossy().as_ref())
721 ),
722 spec: EnvironmentSpec {
723 id: "env-spec".to_string(),
724 execution_mode,
725 workspace_root: workspace_root.to_path_buf(),
726 prepared_path: prepared_path.clone(),
727 session_id: Some("session-recovery".to_string()),
728 run_id: "run-recovery".to_string(),
729 task_id: "task-recovery".to_string(),
730 agent_id: "agent-recovery".to_string(),
731 cleanup_policy: CleanupPolicy::RemoveWhenCleanOtherwiseArchive,
732 write_access: true,
733 git_worktree,
734 remote_url: None,
735 },
736 state: EnvironmentState::Ready,
737 health: EnvironmentHealth::Unknown,
738 prepared_path,
739 lease: None,
740 cleanup_result: None,
741 recovery_status: RecoveryStatus::NotRequired,
742 recovery_action: None,
743 failure: None,
744 created_at: now,
745 updated_at: now,
746 last_verified_at: None,
747 metadata: None,
748 }
749 }
750
751 fn run_git(repo_root: &Path, args: &[&str]) {
752 let status = Command::new("git")
753 .current_dir(repo_root)
754 .args(args)
755 .status()
756 .expect("run git command");
757 assert!(
758 status.success(),
759 "git command failed: git {}",
760 args.join(" ")
761 );
762 }
763
764 fn init_git_repo(repo_root: &Path) {
765 run_git(repo_root, &["init", "--initial-branch=main"]);
766 run_git(
767 repo_root,
768 &["config", "user.email", "gestura-tests@example.com"],
769 );
770 run_git(repo_root, &["config", "user.name", "Gestura Tests"]);
771 std::fs::write(repo_root.join("README.md"), "seed\n").expect("write seed file");
772 run_git(repo_root, &["add", "README.md"]);
773 run_git(repo_root, &["commit", "-m", "Initial commit"]);
774 }
775
776 #[test]
777 fn test_reconcile_environment_record_blocks_missing_shared_workspace() {
778 let temp = tempdir().expect("tempdir");
779 let mut environment = test_environment_record(
780 temp.path(),
781 temp.path().join("missing-shared"),
782 AgentExecutionMode::SharedWorkspace,
783 None,
784 );
785 let task = test_task(
786 temp.path(),
787 "run-shared-missing",
788 "task-shared-missing",
789 "agent-shared",
790 AgentExecutionMode::SharedWorkspace,
791 );
792 let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
793
794 reconcile_environment_record(&mut environment, Some(&mut task_record));
795
796 assert_eq!(environment.health, EnvironmentHealth::Missing);
797 assert_eq!(
798 environment.recovery_status,
799 RecoveryStatus::NeedsOperatorAction
800 );
801 assert_eq!(
802 environment.recovery_action,
803 Some(RecoveryAction::MarkTaskBlocked)
804 );
805 assert_eq!(task_record.state, SupervisorTaskState::Blocked);
806 assert!(
807 task_record
808 .blocked_reasons
809 .iter()
810 .any(|reason| reason.contains("blocked"))
811 );
812 }
813
814 #[test]
815 fn test_reconcile_environment_record_marks_missing_isolated_workspace_for_recreation() {
816 let temp = tempdir().expect("tempdir");
817 let mut environment = test_environment_record(
818 temp.path(),
819 temp.path().join("missing-isolated"),
820 AgentExecutionMode::IsolatedWorkspace,
821 None,
822 );
823 let task = test_task(
824 temp.path(),
825 "run-isolated-missing",
826 "task-isolated-missing",
827 "agent-isolated",
828 AgentExecutionMode::IsolatedWorkspace,
829 );
830 let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
831
832 reconcile_environment_record(&mut environment, Some(&mut task_record));
833
834 assert_eq!(environment.health, EnvironmentHealth::Missing);
835 assert_eq!(environment.recovery_status, RecoveryStatus::Pending);
836 assert_eq!(
837 environment.recovery_action,
838 Some(RecoveryAction::RecreateMissingEnvironment)
839 );
840 assert_eq!(task_record.state, SupervisorTaskState::Blocked);
841 assert!(
842 task_record
843 .blocked_reasons
844 .iter()
845 .any(|reason| reason.contains("must be recreated"))
846 );
847 }
848
849 #[test]
850 fn test_reconcile_environment_record_marks_unregistered_worktree_as_drifted() {
851 let temp = tempdir().expect("tempdir");
852 init_git_repo(temp.path());
853 let drifted_path = temp.path().join(".gestura").join("drifted-worktree");
854 std::fs::create_dir_all(&drifted_path).expect("create drifted worktree path");
855 let git_worktree = GitWorktreeSpec {
856 repo_root: temp.path().to_path_buf(),
857 base_branch: "main".to_string(),
858 worktree_branch: "gestura/session-recovery/run/agent/task".to_string(),
859 worktree_path: drifted_path.clone(),
860 create_branch_if_missing: true,
861 };
862 let mut environment = test_environment_record(
863 temp.path(),
864 drifted_path,
865 AgentExecutionMode::GitWorktree,
866 Some(git_worktree),
867 );
868 let task = test_task(
869 temp.path(),
870 "run-worktree-drifted",
871 "task-worktree-drifted",
872 "agent-worktree",
873 AgentExecutionMode::GitWorktree,
874 );
875 let mut task_record = test_task_record(task, &environment, SupervisorTaskState::Queued);
876
877 reconcile_environment_record(&mut environment, Some(&mut task_record));
878
879 assert_eq!(environment.health, EnvironmentHealth::Drifted);
880 assert_eq!(
881 environment.recovery_status,
882 RecoveryStatus::NeedsOperatorAction
883 );
884 assert_eq!(
885 environment.recovery_action,
886 Some(RecoveryAction::MarkTaskBlocked)
887 );
888 assert_eq!(task_record.state, SupervisorTaskState::Blocked);
889 assert!(
890 task_record
891 .blocked_reasons
892 .iter()
893 .any(|reason| reason.contains("drifted"))
894 );
895 }
896
897 #[tokio::test]
898 async fn test_bootstrap_persisted_state_reconciles_restart_and_orphaned_environments() {
899 let temp = tempdir().expect("tempdir");
900 let orchestrator = test_orchestrator(temp.path());
901
902 let task = test_task(
903 temp.path(),
904 "run-restart",
905 "task-restart",
906 "agent-restart",
907 AgentExecutionMode::IsolatedWorkspace,
908 );
909 let environment = orchestrator
910 .prepare_environment(&task)
911 .await
912 .expect("prepare restart environment");
913 let leased = orchestrator
914 .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
915 .await
916 .expect("acquire environment lease");
917
918 let run = test_run(
919 "run-restart",
920 temp.path(),
921 test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
922 );
923 orchestrator.persist_run(&run).expect("persist run");
924
925 let orphan = orchestrator
926 .prepare_environment(&test_task(
927 temp.path(),
928 "run-orphan",
929 "task-orphan",
930 "agent-orphan",
931 AgentExecutionMode::IsolatedWorkspace,
932 ))
933 .await
934 .expect("prepare orphan environment");
935
936 let recovered = test_orchestrator(temp.path());
937 let environments = recovered.environments.lock().await;
938 let recovered_environment = environments
939 .get(&leased.id)
940 .expect("recovered environment should exist");
941 let orphan_environment = environments
942 .get(&orphan.id)
943 .expect("orphan environment should exist");
944
945 assert_eq!(
946 recovered_environment.recovery_status,
947 RecoveryStatus::Reconciled
948 );
949 assert_eq!(
950 recovered_environment.recovery_action,
951 Some(RecoveryAction::ReleaseStaleLease)
952 );
953 assert_eq!(recovered_environment.state, EnvironmentState::Ready);
954 assert!(
955 recovered_environment
956 .lease
957 .as_ref()
958 .and_then(|lease| lease.released_at)
959 .is_some()
960 );
961
962 assert_eq!(orphan_environment.health, EnvironmentHealth::Orphaned);
963 assert_eq!(orphan_environment.recovery_status, RecoveryStatus::Pending);
964 assert_eq!(
965 orphan_environment.recovery_action,
966 Some(RecoveryAction::QueueCleanup)
967 );
968 drop(environments);
969
970 let runs = recovered.supervisor_runs.lock().await;
971 let run = runs.get("run-restart").expect("recovered run should exist");
972 let record = run.tasks.first().expect("recovered task should exist");
973 assert_eq!(record.state, SupervisorTaskState::Blocked);
974 assert!(
975 record
976 .blocked_reasons
977 .iter()
978 .any(|reason| reason == "execution interrupted during restart")
979 );
980 assert!(
981 record
982 .blocked_reasons
983 .iter()
984 .any(|reason| reason.contains("stale lease released"))
985 );
986 }
987
988 #[tokio::test]
989 async fn test_bootstrap_persisted_state_uses_checkpoint_metadata_for_restart_reason() {
990 let temp = tempdir().expect("tempdir");
991 let orchestrator = test_orchestrator(temp.path());
992
993 let task = test_task(
994 temp.path(),
995 "run-resume",
996 "task-resume",
997 "agent-resume",
998 AgentExecutionMode::IsolatedWorkspace,
999 );
1000 let environment = orchestrator
1001 .prepare_environment(&task)
1002 .await
1003 .expect("prepare resume environment");
1004 let leased = orchestrator
1005 .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
1006 .await
1007 .expect("acquire environment lease");
1008
1009 let run = test_run(
1010 "run-resume",
1011 temp.path(),
1012 test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
1013 );
1014 orchestrator.persist_run(&run).expect("persist run");
1015 persist_checkpoint_to_disk(temp.path(), &test_checkpoint(&task))
1016 .expect("persist checkpoint");
1017
1018 let recovered = test_orchestrator(temp.path());
1019
1020 let runs = recovered.supervisor_runs.lock().await;
1021 let run = runs.get("run-resume").expect("recovered run should exist");
1022 let record = run.tasks.first().expect("recovered task should exist");
1023 assert_eq!(record.state, SupervisorTaskState::Blocked);
1024 assert!(record.blocked_reasons.iter().any(|reason| {
1025 reason.contains("can resume from checkpoint")
1026 && reason.contains("delegated task dispatch boundary")
1027 }));
1028 drop(runs);
1029
1030 let checkpoints = load_persisted_checkpoints(temp.path());
1031 let checkpoint = checkpoints
1032 .into_iter()
1033 .find(|checkpoint| checkpoint.task_id == task.id)
1034 .expect("checkpoint should persist");
1035 assert_eq!(checkpoint.stage, DelegatedCheckpointStage::Blocked);
1036 assert_eq!(
1037 checkpoint.resume_disposition,
1038 DelegatedResumeDisposition::ResumeFromCheckpoint
1039 );
1040 assert_eq!(
1041 checkpoint.note.as_deref(),
1042 Some("execution interrupted during restart")
1043 );
1044 }
1045
1046 #[tokio::test]
1047 async fn test_bootstrap_persisted_state_keeps_operator_gated_checkpoint_blocked() {
1048 let temp = tempdir().expect("tempdir");
1049 let orchestrator = test_orchestrator(temp.path());
1050
1051 let task = test_task(
1052 temp.path(),
1053 "run-operator-gated",
1054 "task-operator-gated",
1055 "agent-operator-gated",
1056 AgentExecutionMode::IsolatedWorkspace,
1057 );
1058 let environment = orchestrator
1059 .prepare_environment(&task)
1060 .await
1061 .expect("prepare operator-gated environment");
1062 let leased = orchestrator
1063 .acquire_environment_lease(&environment.id, &task.id, &task.agent_id)
1064 .await
1065 .expect("acquire environment lease");
1066
1067 let run = test_run(
1068 "run-operator-gated",
1069 temp.path(),
1070 test_task_record(task.clone(), &leased, SupervisorTaskState::Running),
1071 );
1072 orchestrator.persist_run(&run).expect("persist run");
1073
1074 let mut checkpoint = test_checkpoint(&task);
1075 checkpoint.replay_safety = DelegatedReplaySafety::OperatorGated;
1076 checkpoint.resume_disposition = DelegatedResumeDisposition::OperatorInterventionRequired;
1077 checkpoint.safe_boundary_label = "before tool 'shell' execution".to_string();
1078 checkpoint.note = Some("awaiting operator review after restart".to_string());
1079 persist_checkpoint_to_disk(temp.path(), &checkpoint).expect("persist checkpoint");
1080
1081 let recovered = test_orchestrator(temp.path());
1082 let run = recovered
1083 .get_supervisor_run("run-operator-gated")
1084 .await
1085 .expect("recovered run should exist");
1086 let record = run.tasks.first().expect("recovered task should exist");
1087 assert_eq!(record.state, SupervisorTaskState::Blocked);
1088 assert!(record.blocked_reasons.iter().any(|reason| {
1089 reason.contains("operator action required")
1090 && reason.contains("before tool 'shell' execution")
1091 }));
1092
1093 let checkpoint_summary = record
1094 .checkpoint
1095 .as_ref()
1096 .expect("checkpoint summary should be attached after reload");
1097 assert_eq!(checkpoint_summary.stage, DelegatedCheckpointStage::Blocked);
1098 assert_eq!(
1099 checkpoint_summary.replay_safety,
1100 DelegatedReplaySafety::OperatorGated
1101 );
1102 assert_eq!(
1103 checkpoint_summary.resume_disposition,
1104 DelegatedResumeDisposition::OperatorInterventionRequired
1105 );
1106 assert_eq!(
1107 checkpoint_summary.safe_boundary_label,
1108 "before tool 'shell' execution"
1109 );
1110 assert!(!checkpoint_summary.has_resume_state);
1111 assert!(
1112 !checkpoint_summary
1113 .available_actions
1114 .contains(&DelegatedCheckpointAction::ResumeFromCheckpoint)
1115 );
1116 assert!(
1117 checkpoint_summary
1118 .available_actions
1119 .contains(&DelegatedCheckpointAction::RestartFromScratch)
1120 );
1121 assert!(
1122 checkpoint_summary
1123 .available_actions
1124 .contains(&DelegatedCheckpointAction::AcknowledgeBlocked)
1125 );
1126 }
1127
1128 #[test]
1129 fn test_prepare_resumable_tasks_from_checkpoints_sync_marks_safe_tasks_pending() {
1130 let temp = tempdir().expect("tempdir");
1131 let orchestrator = test_orchestrator(temp.path());
1132
1133 let task = test_task(
1134 temp.path(),
1135 "run-safe-resume",
1136 "task-safe-resume",
1137 "agent-safe-resume",
1138 AgentExecutionMode::IsolatedWorkspace,
1139 );
1140 let environment = test_environment_record(
1141 temp.path(),
1142 task.workspace_dir.as_ref().expect("workspace").clone(),
1143 AgentExecutionMode::IsolatedWorkspace,
1144 None,
1145 );
1146 persist_environment_to_disk(temp.path(), &environment).expect("persist environment");
1147
1148 let mut checkpoint = test_checkpoint(&task);
1149 checkpoint.stage = DelegatedCheckpointStage::Blocked;
1150 checkpoint.note = Some("execution interrupted during restart".to_string());
1151 checkpoint.resume_state = Some(build_delegated_resume_state(
1152 &task,
1153 "delegated prompt",
1154 "partial answer",
1155 "",
1156 &[],
1157 0,
1158 ));
1159 persist_checkpoint_to_disk(temp.path(), &checkpoint).expect("persist checkpoint");
1160
1161 let run = test_run(
1162 "run-safe-resume",
1163 temp.path(),
1164 test_task_record(task.clone(), &environment, SupervisorTaskState::Blocked),
1165 );
1166 orchestrator.persist_run(&run).expect("persist run");
1167 orchestrator
1168 .supervisor_runs
1169 .try_lock()
1170 .expect("runs should be lockable")
1171 .insert(run.id.clone(), run);
1172
1173 let resumable = orchestrator.prepare_resumable_tasks_from_checkpoints_sync();
1174 assert_eq!(resumable.len(), 1);
1175 assert_eq!(resumable[0].id, task.id);
1176
1177 let runs = orchestrator
1178 .supervisor_runs
1179 .try_lock()
1180 .expect("runs should be lockable after sync prep");
1181 let run = runs
1182 .get("run-safe-resume")
1183 .expect("recovered run should exist");
1184 let record = run.tasks.first().expect("task record should exist");
1185 assert_eq!(record.state, SupervisorTaskState::Queued);
1186 assert!(record.blocked_reasons.is_empty());
1187 }
1188}