gestura_core/tools/
shell_sessions.rs

1//! PTY-backed shell sessions for reusable agent execution and interactive terminals.
2//!
3//! This module manages two related session types:
4//! - **automation sessions** used by the agent for reusable shell execution
5//! - **interactive sessions** used by the GUI terminal manager for real typing
6
7use crate::error::{AppError, Result};
8use crate::streaming::{ShellOutputStream, ShellProcessState, ShellSessionState, StreamChunk};
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc;
11
12use super::shell_streaming::{ShellRuntimeFailureKind, StreamingCommandResult};
13
14/// Public summary for a managed PTY shell session.
15#[derive(Debug, Clone)]
16pub struct ShellSessionHandle {
17    /// Stable identifier for the long-lived PTY shell session.
18    pub shell_session_id: String,
19    /// Best-effort tracked working directory for the PTY shell session.
20    pub cwd: Option<String>,
21}
22
23/// Public metadata for a managed PTY shell session.
24#[derive(Debug, Clone)]
25pub struct ShellSessionMetadata {
26    /// Stable identifier for the long-lived PTY shell session.
27    pub shell_session_id: String,
28    /// Best-effort tracked working directory for the PTY shell session.
29    pub cwd: Option<String>,
30    /// Whether the session was created for direct user interaction.
31    pub interactive: bool,
32    /// Whether the session is reserved for the user-facing terminal manager.
33    pub user_managed: bool,
34    /// Whether the session can currently be reused by the automation pool.
35    pub available_for_reuse: bool,
36}
37
38/// Execution policy for PTY-backed shell commands.
39#[derive(Debug, Clone, Copy, Default)]
40pub struct ShellExecutionOptions {
41    /// Maximum wall-clock time allowed before the command is considered long-running.
42    pub timeout_secs: Option<u64>,
43    /// Whether shell activity may extend execution past `timeout_secs`.
44    pub allow_long_running: bool,
45    /// Maximum quiet period allowed before an activity-aware command is treated as stalled.
46    pub stall_timeout_secs: Option<u64>,
47}
48
49mod imp {
50    use super::*;
51    use portable_pty::{Child, CommandBuilder, MasterPty, PtySize, native_pty_system};
52    use std::collections::HashMap;
53    use std::io::{ErrorKind, Read, Write};
54    use std::sync::{
55        Arc, Mutex as StdMutex, OnceLock,
56        atomic::{AtomicBool, Ordering},
57    };
58    use tokio::sync::{Mutex, broadcast, mpsc};
59
60    const DEFAULT_ROWS: u16 = 24;
61    const DEFAULT_COLS: u16 = 120;
62    const INTERRUPT_GRACE_SECS: u64 = 2;
63    const STOP_ESCALATION_MILLIS: u64 = 300;
64    const ACTIVE_CHUNK_BUFFER: usize = 256;
65    const SESSION_EVENT_BUFFER: usize = 512;
66    const DEFAULT_EXECUTION_TIMEOUT_SECS: u64 = 300;
67    const MIN_STALL_TIMEOUT_SECS: u64 = 30;
68    const MAX_STALL_TIMEOUT_SECS: u64 = 300;
69    const EARLY_SIGNAL_STALL_TIMEOUT_SECS: u64 = 15;
70    const MAX_QUIET_WAIT_CYCLES_WITHOUT_SIGNAL: u8 = 2;
71    const SHELL_OUTPUT_SEND_TIMEOUT: Duration = Duration::from_millis(100);
72    const STATUS_CHUNK_SEND_TIMEOUT: Duration = Duration::from_millis(100);
73    const STALL_SIGNAL_TAIL_BYTES: usize = 4096;
74    const CHILD_EXIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
75
76    const INTERACTIVE_PROMPT_PATTERNS: &[&str] = &[
77        "ok to proceed?",
78        "need to install the following packages",
79        "would you like to continue",
80        "press enter to continue",
81        "press any key to continue",
82        "select an option",
83        "(y/n)",
84        "[y/n]",
85        "yes/no",
86        "enter password",
87        "enter passphrase",
88        "password:",
89        "passphrase:",
90    ];
91
92    const ERROR_OUTPUT_PATTERNS: &[&str] = &[
93        "command not found",
94        "no such file or directory",
95        "permission denied",
96        "not recognized as an internal or external command",
97        "is not recognized as an internal or external command",
98        "npm err!",
99        "traceback (most recent call last)",
100        "syntax error",
101        "fatal:",
102        "panic:",
103        "exception:",
104    ];
105
106    async fn send_shell_output_chunk_best_effort(
107        tx: &mpsc::Sender<StreamChunk>,
108        chunk: StreamChunk,
109    ) {
110        match tokio::time::timeout(SHELL_OUTPUT_SEND_TIMEOUT, tx.send(chunk)).await {
111            Ok(Ok(())) | Ok(Err(_)) => {}
112            Err(_) => {
113                tracing::debug!(
114                    timeout_ms = SHELL_OUTPUT_SEND_TIMEOUT.as_millis(),
115                    "Dropping PTY shell output chunk because the stream receiver is not draining fast enough"
116                );
117            }
118        }
119    }
120
121    async fn send_status_chunk_best_effort(tx: &mpsc::Sender<StreamChunk>, message: String) {
122        match tokio::time::timeout(
123            STATUS_CHUNK_SEND_TIMEOUT,
124            tx.send(StreamChunk::Status { message }),
125        )
126        .await
127        {
128            Ok(Ok(())) | Ok(Err(_)) => {}
129            Err(_) => {
130                tracing::debug!(
131                    timeout_ms = STATUS_CHUNK_SEND_TIMEOUT.as_millis(),
132                    "Dropping PTY status chunk because the stream receiver is not draining fast enough"
133                );
134            }
135        }
136    }
137
138    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
139    enum SessionMode {
140        Automation,
141        Interactive,
142    }
143
144    #[derive(Default)]
145    struct ManagerState {
146        sessions: HashMap<String, Arc<ShellSession>>,
147        pools: HashMap<String, Vec<String>>,
148        process_index: HashMap<String, String>,
149    }
150
151    pub(super) struct ShellSessionManager {
152        state: Mutex<ManagerState>,
153    }
154
155    struct ShellSession {
156        shell_session_id: String,
157        pool_key: String,
158        mode: SessionMode,
159        master: Mutex<Option<Box<dyn MasterPty + Send>>>,
160        writer: Mutex<Option<Box<dyn Write + Send>>>,
161        command_lock: Mutex<()>,
162        active_sender: Arc<StdMutex<Option<mpsc::Sender<String>>>>,
163        event_tx: broadcast::Sender<StreamChunk>,
164        child: Arc<StdMutex<Option<Box<dyn Child + Send + Sync>>>>,
165        closed: Arc<AtomicBool>,
166        claimed_by_user: Arc<AtomicBool>,
167        user_stop_requested: Arc<AtomicBool>,
168        state: Arc<StdMutex<ShellSessionState>>,
169        working_directory: Arc<StdMutex<Option<String>>>,
170        active_process_id: Arc<StdMutex<Option<String>>>,
171        active_command: Arc<StdMutex<Option<String>>>,
172    }
173
174    struct ParsedChunk {
175        output: String,
176        exit_code: Option<i32>,
177    }
178
179    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
180    enum StallSignal {
181        None,
182        InteractivePrompt,
183        ErrorOutput,
184    }
185
186    impl StallSignal {
187        fn runtime_failure_kind(self) -> Option<ShellRuntimeFailureKind> {
188            match self {
189                Self::None => None,
190                Self::InteractivePrompt => Some(ShellRuntimeFailureKind::WaitingForInput),
191                Self::ErrorOutput => Some(ShellRuntimeFailureKind::ErrorOutput),
192            }
193        }
194
195        fn status_message(self, shell_session_id: &str, command: &str) -> String {
196            match self {
197                Self::None => format!(
198                    "Shell runtime status: session `{shell_session_id}` saw no prompt or error indicator during a quiet period for `{command}` and will continue waiting."
199                ),
200                Self::InteractivePrompt => format!(
201                    "Shell runtime status: session `{shell_session_id}` classified quiet command `{command}` as waiting_for_input and is interrupting it."
202                ),
203                Self::ErrorOutput => format!(
204                    "Shell runtime status: session `{shell_session_id}` classified quiet command `{command}` as error_output and is interrupting it."
205                ),
206            }
207        }
208    }
209
210    struct CommandCompletion {
211        process_id: String,
212        stdout: String,
213        exit_code: i32,
214        process_state: ShellProcessState,
215        duration_ms: u64,
216        session_state: ShellSessionState,
217        failure_kind: Option<ShellRuntimeFailureKind>,
218    }
219
220    struct SessionOutputParser {
221        pending: String,
222        started: bool,
223        start_marker: String,
224        done_prefix: String,
225    }
226
227    static MANAGER: OnceLock<ShellSessionManager> = OnceLock::new();
228
229    fn manager() -> &'static ShellSessionManager {
230        MANAGER.get_or_init(|| ShellSessionManager {
231            state: Mutex::new(ManagerState::default()),
232        })
233    }
234
235    pub(super) async fn create_session(
236        pool_key: &str,
237        initial_cwd: Option<&str>,
238        tx: Option<mpsc::Sender<StreamChunk>>,
239    ) -> Result<ShellSessionHandle> {
240        let session = spawn_session(pool_key, initial_cwd, SessionMode::Interactive).await?;
241        manager().insert_session(session.clone()).await;
242        session.set_state(ShellSessionState::Idle)?;
243        if let Some(tx) = tx {
244            session.subscribe(tx);
245        }
246        session.emit_session_lifecycle();
247        Ok(session.handle())
248    }
249
250    #[allow(dead_code)]
251    pub(super) async fn execute_in_session(
252        pool_key: &str,
253        initial_cwd: Option<&str>,
254        command: &str,
255        command_cwd: Option<&str>,
256        timeout_secs: Option<u64>,
257        tx: mpsc::Sender<StreamChunk>,
258    ) -> Result<StreamingCommandResult> {
259        execute_in_session_with_options(
260            pool_key,
261            initial_cwd,
262            command,
263            command_cwd,
264            ShellExecutionOptions {
265                timeout_secs,
266                ..ShellExecutionOptions::default()
267            },
268            tx,
269        )
270        .await
271    }
272
273    pub(super) async fn execute_in_session_with_options(
274        pool_key: &str,
275        initial_cwd: Option<&str>,
276        command: &str,
277        command_cwd: Option<&str>,
278        options: ShellExecutionOptions,
279        tx: mpsc::Sender<StreamChunk>,
280    ) -> Result<StreamingCommandResult> {
281        let session = acquire_execution_session(pool_key, initial_cwd).await?;
282        session.execute(command, command_cwd, options, tx).await
283    }
284
285    fn default_stall_timeout_secs(timeout_secs: u64) -> u64 {
286        timeout_secs.clamp(MIN_STALL_TIMEOUT_SECS, MAX_STALL_TIMEOUT_SECS)
287    }
288
289    fn early_signal_stall_timeout(
290        stall_timeout: Duration,
291        stall_signal: StallSignal,
292    ) -> Option<Duration> {
293        match stall_signal {
294            StallSignal::InteractivePrompt | StallSignal::ErrorOutput => {
295                Some(stall_timeout.min(Duration::from_secs(EARLY_SIGNAL_STALL_TIMEOUT_SECS)))
296            }
297            StallSignal::None => None,
298        }
299    }
300
301    pub(super) async fn stop_process(process_id: &str) -> Result<Option<ShellSessionHandle>> {
302        let session = manager().find_session_by_process(process_id).await;
303        if let Some(session) = session {
304            session.request_stop().await?;
305            return Ok(Some(session.handle()));
306        }
307        Ok(None)
308    }
309
310    pub(super) async fn stop_session(shell_session_id: &str) -> Result<Option<ShellSessionHandle>> {
311        let session = manager().remove_session(shell_session_id).await;
312        if let Some(session) = session {
313            session.user_stop_requested.store(true, Ordering::SeqCst);
314            session.set_state(ShellSessionState::Stopping)?;
315            session.emit_session_lifecycle();
316            session.terminate().await?;
317            session.set_active_command(None, None)?;
318            session.set_state(ShellSessionState::Stopped)?;
319            session.emit_session_lifecycle();
320            return Ok(Some(session.handle()));
321        }
322        Ok(None)
323    }
324
325    pub(super) async fn shutdown_session(pool_key: &str) -> Result<()> {
326        let sessions = manager().remove_pool(pool_key).await;
327        for session in sessions {
328            session.user_stop_requested.store(true, Ordering::SeqCst);
329            session.set_state(ShellSessionState::Stopping)?;
330            session.emit_session_lifecycle();
331            session.terminate().await?;
332            session.set_active_command(None, None)?;
333            session.set_state(ShellSessionState::Stopped)?;
334            session.emit_session_lifecycle();
335        }
336        Ok(())
337    }
338
339    pub(super) async fn send_input(shell_session_id: &str, data: &str) -> Result<()> {
340        let Some(session) = manager().find_session(shell_session_id).await else {
341            return Err(AppError::Session(format!(
342                "unknown shell session: {shell_session_id}"
343            )));
344        };
345        session.send_input(data).await
346    }
347
348    pub(super) async fn resize_session(shell_session_id: &str, cols: u16, rows: u16) -> Result<()> {
349        let Some(session) = manager().find_session(shell_session_id).await else {
350            return Err(AppError::Session(format!(
351                "unknown shell session: {shell_session_id}"
352            )));
353        };
354        session.resize(cols, rows).await
355    }
356
357    pub(super) async fn describe_session(
358        shell_session_id: &str,
359    ) -> Result<Option<ShellSessionMetadata>> {
360        Ok(manager()
361            .find_session(shell_session_id)
362            .await
363            .map(|session| session.metadata()))
364    }
365
366    pub(super) async fn claim_session(
367        shell_session_id: &str,
368    ) -> Result<Option<ShellSessionMetadata>> {
369        let Some(session) = manager().find_session(shell_session_id).await else {
370            return Ok(None);
371        };
372
373        session.claim_for_user();
374        Ok(Some(session.metadata()))
375    }
376
377    pub(super) async fn subscribe_session(
378        shell_session_id: &str,
379        tx: mpsc::Sender<StreamChunk>,
380    ) -> Result<Option<ShellSessionMetadata>> {
381        let Some(session) = manager().find_session(shell_session_id).await else {
382            return Ok(None);
383        };
384
385        session.subscribe(tx);
386        Ok(Some(session.metadata()))
387    }
388
389    async fn spawn_session(
390        pool_key: &str,
391        initial_cwd: Option<&str>,
392        mode: SessionMode,
393    ) -> Result<Arc<ShellSession>> {
394        let pool_key_owned = pool_key.to_string();
395        let initial_cwd_owned = initial_cwd.map(ToOwned::to_owned);
396        tokio::task::spawn_blocking(move || {
397            ShellSession::spawn(&pool_key_owned, initial_cwd_owned.as_deref(), mode)
398        })
399        .await
400        .map_err(|error| AppError::Session(format!("failed to spawn PTY session task: {error}")))?
401    }
402
403    async fn acquire_execution_session(
404        pool_key: &str,
405        initial_cwd: Option<&str>,
406    ) -> Result<Arc<ShellSession>> {
407        if let Some(session) = manager().acquire_idle_session(pool_key).await? {
408            return Ok(session);
409        }
410
411        let session = spawn_session(pool_key, initial_cwd, SessionMode::Automation).await?;
412        manager().insert_session(session.clone()).await;
413        session.set_state(ShellSessionState::Idle)?;
414        session.mark_busy_if_idle()?;
415        Ok(session)
416    }
417
418    impl ShellSessionManager {
419        async fn insert_session(&self, session: Arc<ShellSession>) {
420            let mut state = self.state.lock().await;
421            let shell_session_id = session.shell_session_id.clone();
422            state
423                .pools
424                .entry(session.pool_key.clone())
425                .or_default()
426                .push(shell_session_id.clone());
427            state.sessions.insert(shell_session_id, session);
428        }
429
430        async fn acquire_idle_session(&self, pool_key: &str) -> Result<Option<Arc<ShellSession>>> {
431            let mut state = self.state.lock().await;
432            prune_pool_locked(&mut state, pool_key);
433            let session_ids = state.pools.get(pool_key).cloned().unwrap_or_default();
434            for shell_session_id in session_ids {
435                if let Some(session) = state.sessions.get(&shell_session_id).cloned()
436                    && session.mark_busy_if_idle()?
437                {
438                    return Ok(Some(session));
439                }
440            }
441            Ok(None)
442        }
443
444        async fn register_process(&self, process_id: String, shell_session_id: String) {
445            let mut state = self.state.lock().await;
446            state.process_index.insert(process_id, shell_session_id);
447        }
448
449        async fn unregister_process(&self, process_id: &str) {
450            let mut state = self.state.lock().await;
451            state.process_index.remove(process_id);
452        }
453
454        async fn find_session(&self, shell_session_id: &str) -> Option<Arc<ShellSession>> {
455            let state = self.state.lock().await;
456            state.sessions.get(shell_session_id).cloned()
457        }
458
459        async fn find_session_by_process(&self, process_id: &str) -> Option<Arc<ShellSession>> {
460            let state = self.state.lock().await;
461            let shell_session_id = state.process_index.get(process_id)?.clone();
462            state.sessions.get(&shell_session_id).cloned()
463        }
464
465        async fn remove_session(&self, shell_session_id: &str) -> Option<Arc<ShellSession>> {
466            let mut state = self.state.lock().await;
467            let session = state.sessions.remove(shell_session_id)?;
468            state.process_index.retain(|_, sid| sid != shell_session_id);
469            if let Some(pool) = state.pools.get_mut(&session.pool_key) {
470                pool.retain(|sid| sid != shell_session_id);
471                if pool.is_empty() {
472                    state.pools.remove(&session.pool_key);
473                }
474            }
475            Some(session)
476        }
477
478        async fn remove_pool(&self, pool_key: &str) -> Vec<Arc<ShellSession>> {
479            let mut state = self.state.lock().await;
480            let session_ids = state.pools.remove(pool_key).unwrap_or_default();
481            let mut removed = Vec::with_capacity(session_ids.len());
482            for shell_session_id in session_ids {
483                if let Some(session) = state.sessions.remove(&shell_session_id) {
484                    state
485                        .process_index
486                        .retain(|_, sid| sid != &shell_session_id);
487                    removed.push(session);
488                }
489            }
490            removed
491        }
492    }
493
494    impl ShellSession {
495        fn closed_session_error() -> AppError {
496            AppError::Session(
497                "PTY shell session closed unexpectedly; retry to create a fresh shell".to_string(),
498            )
499        }
500
501        fn spawn(
502            pool_key: &str,
503            initial_cwd: Option<&str>,
504            mode: SessionMode,
505        ) -> Result<Arc<Self>> {
506            let pty_system = native_pty_system();
507            let pair = pty_system
508                .openpty(PtySize {
509                    rows: DEFAULT_ROWS,
510                    cols: DEFAULT_COLS,
511                    pixel_width: 0,
512                    pixel_height: 0,
513                })
514                .map_err(|error| AppError::Session(format!("failed to open PTY: {error:#}")))?;
515
516            let mut builder = build_shell_command(mode);
517            if let Some(dir) = initial_cwd {
518                builder.cwd(dir);
519            }
520            builder.env("TERM", "xterm-256color");
521            builder.env("GESTURA_PTY", "1");
522
523            let child = pair.slave.spawn_command(builder).map_err(|error| {
524                AppError::Session(format!("failed to spawn PTY shell: {error:#}"))
525            })?;
526
527            let reader = pair.master.try_clone_reader().map_err(|error| {
528                AppError::Session(format!("failed to clone PTY reader: {error:#}"))
529            })?;
530            let mut writer = pair.master.take_writer().map_err(|error| {
531                AppError::Session(format!("failed to take PTY writer: {error:#}"))
532            })?;
533
534            prepare_shell(mode, &mut writer)?;
535
536            let shell_session_id = format!("shell-{}", uuid::Uuid::new_v4());
537            let active_sender = Arc::new(StdMutex::new(None));
538            let active_process_id = Arc::new(StdMutex::new(None));
539            let closed = Arc::new(AtomicBool::new(false));
540            let claimed_by_user = Arc::new(AtomicBool::new(false));
541            let child = Arc::new(StdMutex::new(Some(child)));
542            let (event_tx, _) = broadcast::channel(SESSION_EVENT_BUFFER);
543
544            spawn_reader_loop(
545                reader,
546                ReaderLoopContext {
547                    active_sender: active_sender.clone(),
548                    active_process_id: active_process_id.clone(),
549                    closed: closed.clone(),
550                    claimed_by_user: claimed_by_user.clone(),
551                    event_tx: event_tx.clone(),
552                    shell_session_id: shell_session_id.clone(),
553                    emit_raw_output: matches!(mode, SessionMode::Interactive),
554                },
555            );
556            spawn_child_monitor(child.clone(), closed.clone(), shell_session_id.clone());
557
558            Ok(Arc::new(Self {
559                shell_session_id,
560                pool_key: pool_key.to_string(),
561                mode,
562                master: Mutex::new(Some(pair.master)),
563                writer: Mutex::new(Some(writer)),
564                command_lock: Mutex::new(()),
565                active_sender,
566                event_tx,
567                child,
568                closed,
569                claimed_by_user,
570                user_stop_requested: Arc::new(AtomicBool::new(false)),
571                state: Arc::new(StdMutex::new(ShellSessionState::Starting)),
572                working_directory: Arc::new(StdMutex::new(initial_cwd.map(ToOwned::to_owned))),
573                active_process_id,
574                active_command: Arc::new(StdMutex::new(None)),
575            }))
576        }
577
578        fn handle(&self) -> ShellSessionHandle {
579            ShellSessionHandle {
580                shell_session_id: self.shell_session_id.clone(),
581                cwd: self.current_working_directory(),
582            }
583        }
584
585        fn metadata(&self) -> ShellSessionMetadata {
586            ShellSessionMetadata {
587                shell_session_id: self.shell_session_id.clone(),
588                cwd: self.current_working_directory(),
589                interactive: true,
590                user_managed: self.is_user_managed(),
591                available_for_reuse: self.is_available_for_reuse(),
592            }
593        }
594
595        fn is_interactive(&self) -> bool {
596            matches!(self.mode, SessionMode::Interactive)
597        }
598
599        fn is_user_managed(&self) -> bool {
600            self.is_interactive() || self.claimed_by_user.load(Ordering::SeqCst)
601        }
602
603        fn is_closed(&self) -> bool {
604            self.closed.load(Ordering::SeqCst)
605        }
606
607        fn current_working_directory(&self) -> Option<String> {
608            self.working_directory
609                .lock()
610                .ok()
611                .and_then(|guard| guard.clone())
612        }
613
614        fn current_active_process_id(&self) -> Option<String> {
615            self.active_process_id
616                .lock()
617                .ok()
618                .and_then(|guard| guard.clone())
619        }
620
621        fn current_active_command(&self) -> Option<String> {
622            self.active_command
623                .lock()
624                .ok()
625                .and_then(|guard| guard.clone())
626        }
627
628        fn state_value(&self) -> ShellSessionState {
629            self.state
630                .lock()
631                .ok()
632                .map(|guard| *guard)
633                .unwrap_or(ShellSessionState::Failed)
634        }
635
636        fn is_available_for_reuse(&self) -> bool {
637            matches!(self.mode, SessionMode::Automation)
638                && !self.is_user_managed()
639                && matches!(self.state_value(), ShellSessionState::Idle)
640                && !self.is_closed()
641        }
642
643        fn set_state(&self, state: ShellSessionState) -> Result<()> {
644            let mut guard = self
645                .state
646                .lock()
647                .map_err(|_| AppError::Session("failed to lock shell session state".to_string()))?;
648            *guard = state;
649            Ok(())
650        }
651
652        fn mark_busy_if_idle(&self) -> Result<bool> {
653            if self.is_closed()
654                || !matches!(self.mode, SessionMode::Automation)
655                || self.is_user_managed()
656            {
657                return Ok(false);
658            }
659            let mut guard = self
660                .state
661                .lock()
662                .map_err(|_| AppError::Session("failed to lock shell session state".to_string()))?;
663            if !matches!(*guard, ShellSessionState::Idle) {
664                return Ok(false);
665            }
666            *guard = ShellSessionState::Busy;
667            Ok(true)
668        }
669
670        fn claim_for_user(&self) {
671            if self.is_interactive() {
672                return;
673            }
674
675            if self
676                .claimed_by_user
677                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
678                .is_ok()
679            {
680                self.emit_session_lifecycle();
681            }
682        }
683
684        fn set_active_command(
685            &self,
686            process_id: Option<String>,
687            command: Option<String>,
688        ) -> Result<()> {
689            let mut process_guard = self.active_process_id.lock().map_err(|_| {
690                AppError::Session("failed to lock active shell process id".to_string())
691            })?;
692            *process_guard = process_id;
693
694            let mut command_guard = self.active_command.lock().map_err(|_| {
695                AppError::Session("failed to lock active shell command".to_string())
696            })?;
697            *command_guard = command;
698            Ok(())
699        }
700
701        fn set_active_sender(&self, sender: Option<mpsc::Sender<String>>) -> Result<()> {
702            let mut guard = self.active_sender.lock().map_err(|_| {
703                AppError::Session("failed to lock active PTY command sender".to_string())
704            })?;
705            *guard = sender;
706            Ok(())
707        }
708
709        fn update_working_directory(&self, cwd: Option<String>) -> Result<()> {
710            let mut guard = self.working_directory.lock().map_err(|_| {
711                AppError::Session("failed to lock shell session working directory".to_string())
712            })?;
713            *guard = cwd;
714            Ok(())
715        }
716
717        fn subscribe(&self, tx: mpsc::Sender<StreamChunk>) {
718            let mut rx = self.event_tx.subscribe();
719            tokio::spawn(async move {
720                loop {
721                    match rx.recv().await {
722                        Ok(chunk) => {
723                            if tx.send(chunk).await.is_err() {
724                                return;
725                            }
726                        }
727                        Err(broadcast::error::RecvError::Lagged(_)) => continue,
728                        Err(broadcast::error::RecvError::Closed) => return,
729                    }
730                }
731            });
732        }
733
734        fn emit_broadcast(&self, chunk: StreamChunk) {
735            let _ = self.event_tx.send(chunk);
736        }
737
738        fn session_lifecycle_chunk(&self) -> StreamChunk {
739            StreamChunk::ShellSessionLifecycle {
740                shell_session_id: self.shell_session_id.clone(),
741                state: self.state_value(),
742                cwd: self.current_working_directory(),
743                active_process_id: self.current_active_process_id(),
744                active_command: self.current_active_command(),
745                available_for_reuse: self.is_available_for_reuse(),
746                interactive: self.is_interactive(),
747                user_managed: self.is_user_managed(),
748            }
749        }
750
751        fn emit_session_lifecycle(&self) {
752            self.emit_broadcast(self.session_lifecycle_chunk());
753        }
754
755        async fn emit_session_lifecycle_to(&self, tx: &mpsc::Sender<StreamChunk>) {
756            let chunk = self.session_lifecycle_chunk();
757            self.emit_broadcast(chunk.clone());
758            let _ = tx.send(chunk).await;
759        }
760
761        async fn send_input(&self, data: &str) -> Result<()> {
762            if self.is_closed() {
763                return Err(Self::closed_session_error());
764            }
765
766            self.claim_for_user();
767
768            if data.is_empty() {
769                return Ok(());
770            }
771
772            let mut writer = self.writer.lock().await;
773            let writer = writer.as_mut().ok_or_else(Self::closed_session_error)?;
774            writer.write_all(data.as_bytes()).map_err(AppError::Io)?;
775            writer.flush().map_err(AppError::Io)
776        }
777
778        async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
779            let cols = cols.max(1);
780            let rows = rows.max(1);
781            let master = self.master.lock().await;
782            let master = master.as_ref().ok_or_else(Self::closed_session_error)?;
783            master
784                .resize(PtySize {
785                    rows,
786                    cols,
787                    pixel_width: 0,
788                    pixel_height: 0,
789                })
790                .map_err(|error| AppError::Session(format!("failed to resize PTY: {error:#}")))
791        }
792
793        async fn execute(
794            self: Arc<Self>,
795            command: &str,
796            command_cwd: Option<&str>,
797            options: ShellExecutionOptions,
798            tx: mpsc::Sender<StreamChunk>,
799        ) -> Result<StreamingCommandResult> {
800            let _command_guard = self.command_lock.lock().await;
801            if self.is_closed() {
802                return Err(Self::closed_session_error());
803            }
804
805            let process_id = uuid::Uuid::new_v4().to_string();
806            let start_marker = format!("__GESTURA_START_{process_id}__");
807            let done_prefix = format!("__GESTURA_DONE_{process_id}__:");
808            let wrapped = wrap_command(command, command_cwd, &start_marker, &done_prefix);
809            let timeout_secs = options
810                .timeout_secs
811                .unwrap_or(DEFAULT_EXECUTION_TIMEOUT_SECS);
812            let timeout = Duration::from_secs(timeout_secs);
813            let stall_timeout = Duration::from_secs(
814                options
815                    .stall_timeout_secs
816                    .unwrap_or_else(|| default_stall_timeout_secs(timeout_secs)),
817            );
818            let start = Instant::now();
819
820            let (chunk_tx, mut chunk_rx) = mpsc::channel::<String>(ACTIVE_CHUNK_BUFFER);
821            self.user_stop_requested.store(false, Ordering::SeqCst);
822            self.set_active_sender(Some(chunk_tx))?;
823            self.set_active_command(Some(process_id.clone()), Some(command.to_string()))?;
824            self.set_state(ShellSessionState::Busy)?;
825            manager()
826                .register_process(process_id.clone(), self.shell_session_id.clone())
827                .await;
828            self.emit_session_lifecycle_to(&tx).await;
829
830            {
831                let mut writer = self.writer.lock().await;
832                let writer = writer.as_mut().ok_or_else(Self::closed_session_error)?;
833                if let Err(error) = writer.write_all(wrapped.as_bytes()) {
834                    self.set_state(ShellSessionState::Failed)?;
835                    self.set_active_sender(None)?;
836                    self.set_active_command(None, None)?;
837                    manager().unregister_process(&process_id).await;
838                    self.emit_session_lifecycle_to(&tx).await;
839                    return Err(AppError::Io(error));
840                }
841                writer.flush().map_err(AppError::Io)?;
842            }
843
844            let start_chunk = StreamChunk::ShellLifecycle {
845                process_id: process_id.clone(),
846                shell_session_id: Some(self.shell_session_id.clone()),
847                state: ShellProcessState::Started,
848                exit_code: None,
849                duration_ms: None,
850                command: command.to_string(),
851                cwd: command_cwd.map(ToOwned::to_owned),
852            };
853            self.emit_broadcast(start_chunk.clone());
854            let _ = tx.send(start_chunk).await;
855
856            let mut parser = SessionOutputParser::new(start_marker, done_prefix);
857            let mut stdout = String::new();
858            let mut timed_out = false;
859            let mut interrupted = false;
860            let mut interrupt_started_at: Option<Instant> = None;
861            let mut last_activity_at: Option<Instant> = None;
862            let mut continued_wait_anchor_at: Option<Instant> = None;
863            let mut continued_wait_cycles = 0_u8;
864            let mut recent_output_tail = String::new();
865            let mut runtime_failure_kind: Option<ShellRuntimeFailureKind> = None;
866
867            loop {
868                if interrupted
869                    && interrupt_started_at
870                        .expect("interrupt timestamp must be set")
871                        .elapsed()
872                        >= Duration::from_secs(INTERRUPT_GRACE_SECS)
873                {
874                    self.set_state(ShellSessionState::Failed)?;
875                    self.emit_session_lifecycle();
876                    self.terminate().await?;
877                    let flushed = parser.finish();
878                    if !flushed.output.is_empty() {
879                        stdout.push_str(&flushed.output);
880                        let output_chunk = StreamChunk::ShellOutput {
881                            process_id: process_id.clone(),
882                            shell_session_id: Some(self.shell_session_id.clone()),
883                            stream: ShellOutputStream::Stdout,
884                            data: flushed.output,
885                        };
886                        self.emit_broadcast(output_chunk.clone());
887                        send_shell_output_chunk_best_effort(&tx, output_chunk).await;
888                    }
889
890                    let duration_ms = start.elapsed().as_millis() as u64;
891                    return self
892                        .finish_command(
893                            &tx,
894                            command,
895                            command_cwd,
896                            CommandCompletion {
897                                process_id,
898                                stdout,
899                                exit_code: 124,
900                                process_state: ShellProcessState::Failed,
901                                duration_ms,
902                                session_state: ShellSessionState::Failed,
903                                failure_kind: runtime_failure_kind
904                                    .or(Some(ShellRuntimeFailureKind::TimedOut)),
905                            },
906                        )
907                        .await;
908                }
909
910                let wait_for = if interrupted {
911                    Duration::from_secs(INTERRUPT_GRACE_SECS)
912                        .checked_sub(
913                            interrupt_started_at
914                                .expect("interrupt timestamp must be set")
915                                .elapsed(),
916                        )
917                        .unwrap_or(Duration::from_secs(0))
918                } else if options.allow_long_running && start.elapsed() >= timeout {
919                    let idle_for = last_activity_at
920                        .or(continued_wait_anchor_at)
921                        .map(|timestamp| timestamp.elapsed())
922                        .unwrap_or_else(|| start.elapsed());
923                    stall_timeout
924                        .checked_sub(idle_for)
925                        .unwrap_or(Duration::from_secs(0))
926                } else if options.allow_long_running {
927                    let idle_for = last_activity_at
928                        .or(continued_wait_anchor_at)
929                        .map(|timestamp| timestamp.elapsed())
930                        .unwrap_or_else(|| start.elapsed());
931                    let stall_signal =
932                        inspect_stall_signal(&recent_output_tail, parser.buffered_output());
933                    if let Some(signal_timeout) =
934                        early_signal_stall_timeout(stall_timeout, stall_signal)
935                    {
936                        timeout
937                            .checked_sub(start.elapsed())
938                            .unwrap_or(Duration::from_secs(0))
939                            .min(
940                                signal_timeout
941                                    .checked_sub(idle_for)
942                                    .unwrap_or(Duration::from_secs(0)),
943                            )
944                    } else {
945                        timeout
946                            .checked_sub(start.elapsed())
947                            .unwrap_or(Duration::from_secs(0))
948                    }
949                } else {
950                    timeout
951                        .checked_sub(start.elapsed())
952                        .unwrap_or(Duration::from_secs(0))
953                };
954
955                match tokio::time::timeout(wait_for, chunk_rx.recv()).await {
956                    Ok(Some(chunk)) => {
957                        last_activity_at = Some(Instant::now());
958                        continued_wait_anchor_at = None;
959                        continued_wait_cycles = 0;
960                        let parsed = parser.push(&chunk);
961                        if !parsed.output.is_empty() {
962                            stdout.push_str(&parsed.output);
963                            append_recent_output_tail(&mut recent_output_tail, &parsed.output);
964                            let output_chunk = StreamChunk::ShellOutput {
965                                process_id: process_id.clone(),
966                                shell_session_id: Some(self.shell_session_id.clone()),
967                                stream: ShellOutputStream::Stdout,
968                                data: parsed.output,
969                            };
970                            self.emit_broadcast(output_chunk.clone());
971                            send_shell_output_chunk_best_effort(&tx, output_chunk).await;
972                        }
973
974                        if let Some(exit_code) = parsed.exit_code {
975                            let user_stopped =
976                                self.user_stop_requested.swap(false, Ordering::SeqCst);
977                            let duration_ms = start.elapsed().as_millis() as u64;
978                            let process_state = if timed_out {
979                                ShellProcessState::Failed
980                            } else if user_stopped {
981                                ShellProcessState::Stopped
982                            } else if exit_code == 0 {
983                                ShellProcessState::Completed
984                            } else {
985                                ShellProcessState::Failed
986                            };
987                            let reported_exit = if timed_out {
988                                124
989                            } else if user_stopped && exit_code == 0 {
990                                130
991                            } else {
992                                exit_code
993                            };
994                            let session_state = if self.is_closed() {
995                                if user_stopped {
996                                    ShellSessionState::Stopped
997                                } else {
998                                    ShellSessionState::Failed
999                                }
1000                            } else {
1001                                ShellSessionState::Idle
1002                            };
1003
1004                            return self
1005                                .finish_command(
1006                                    &tx,
1007                                    command,
1008                                    command_cwd,
1009                                    CommandCompletion {
1010                                        process_id,
1011                                        stdout,
1012                                        exit_code: reported_exit,
1013                                        process_state,
1014                                        duration_ms,
1015                                        session_state,
1016                                        failure_kind: if timed_out {
1017                                            runtime_failure_kind
1018                                                .or(Some(ShellRuntimeFailureKind::TimedOut))
1019                                        } else {
1020                                            runtime_failure_kind
1021                                        },
1022                                    },
1023                                )
1024                                .await;
1025                        }
1026                    }
1027                    Ok(None) => {
1028                        let flushed = parser.finish();
1029                        if !flushed.output.is_empty() {
1030                            stdout.push_str(&flushed.output);
1031                            let output_chunk = StreamChunk::ShellOutput {
1032                                process_id: process_id.clone(),
1033                                shell_session_id: Some(self.shell_session_id.clone()),
1034                                stream: ShellOutputStream::Stdout,
1035                                data: flushed.output.clone(),
1036                            };
1037                            self.emit_broadcast(output_chunk.clone());
1038                            send_shell_output_chunk_best_effort(&tx, output_chunk).await;
1039                        }
1040
1041                        if let Some(exit_code) = flushed.exit_code {
1042                            let user_stopped =
1043                                self.user_stop_requested.swap(false, Ordering::SeqCst);
1044                            let duration_ms = start.elapsed().as_millis() as u64;
1045                            let process_state = if user_stopped {
1046                                ShellProcessState::Stopped
1047                            } else if exit_code == 0 {
1048                                ShellProcessState::Completed
1049                            } else {
1050                                ShellProcessState::Failed
1051                            };
1052                            let reported_exit = if user_stopped && exit_code == 0 {
1053                                130
1054                            } else {
1055                                exit_code
1056                            };
1057                            let session_state = if user_stopped {
1058                                ShellSessionState::Stopped
1059                            } else if self.is_closed() {
1060                                ShellSessionState::Failed
1061                            } else {
1062                                ShellSessionState::Idle
1063                            };
1064
1065                            return self
1066                                .finish_command(
1067                                    &tx,
1068                                    command,
1069                                    command_cwd,
1070                                    CommandCompletion {
1071                                        process_id,
1072                                        stdout,
1073                                        exit_code: reported_exit,
1074                                        process_state,
1075                                        duration_ms,
1076                                        session_state,
1077                                        failure_kind: runtime_failure_kind,
1078                                    },
1079                                )
1080                                .await;
1081                        }
1082
1083                        let user_stopped = self.user_stop_requested.swap(false, Ordering::SeqCst);
1084                        let duration_ms = start.elapsed().as_millis() as u64;
1085                        let exit_code = if user_stopped { 130 } else { -1 };
1086                        let process_state = if user_stopped {
1087                            ShellProcessState::Stopped
1088                        } else {
1089                            ShellProcessState::Failed
1090                        };
1091                        let session_state = if user_stopped {
1092                            ShellSessionState::Stopped
1093                        } else {
1094                            ShellSessionState::Failed
1095                        };
1096
1097                        return self
1098                            .finish_command(
1099                                &tx,
1100                                command,
1101                                command_cwd,
1102                                CommandCompletion {
1103                                    process_id,
1104                                    stdout,
1105                                    exit_code,
1106                                    process_state,
1107                                    duration_ms,
1108                                    session_state,
1109                                    failure_kind: if timed_out {
1110                                        runtime_failure_kind
1111                                            .or(Some(ShellRuntimeFailureKind::TimedOut))
1112                                    } else {
1113                                        runtime_failure_kind
1114                                    },
1115                                },
1116                            )
1117                            .await;
1118                    }
1119                    Err(_) if !interrupted => {
1120                        if options.allow_long_running {
1121                            let idle_for = last_activity_at
1122                                .or(continued_wait_anchor_at)
1123                                .map(|timestamp| timestamp.elapsed())
1124                                .unwrap_or_else(|| start.elapsed());
1125                            let stall_signal =
1126                                inspect_stall_signal(&recent_output_tail, parser.buffered_output());
1127
1128                            if let Some(signal_timeout) =
1129                                early_signal_stall_timeout(stall_timeout, stall_signal)
1130                                && idle_for >= signal_timeout
1131                            {
1132                                runtime_failure_kind = stall_signal.runtime_failure_kind();
1133                                send_status_chunk_best_effort(
1134                                    &tx,
1135                                    stall_signal.status_message(&self.shell_session_id, command),
1136                                )
1137                                .await;
1138
1139                                match stall_signal {
1140                                    StallSignal::InteractivePrompt => {
1141                                        tracing::info!(
1142                                            shell_session_id = %self.shell_session_id,
1143                                            process_id = %process_id,
1144                                            command = %command,
1145                                            stall_timeout_secs = signal_timeout.as_secs(),
1146                                            "Interrupting quiet PTY command early because recent output looks interactive"
1147                                        );
1148                                    }
1149                                    StallSignal::ErrorOutput => {
1150                                        tracing::info!(
1151                                            shell_session_id = %self.shell_session_id,
1152                                            process_id = %process_id,
1153                                            command = %command,
1154                                            stall_timeout_secs = signal_timeout.as_secs(),
1155                                            "Interrupting quiet PTY command early because recent output looks like an error"
1156                                        );
1157                                    }
1158                                    StallSignal::None => {}
1159                                }
1160
1161                                timed_out = true;
1162                                interrupted = true;
1163                                interrupt_started_at = Some(Instant::now());
1164                                self.set_state(ShellSessionState::Interrupting)?;
1165                                self.emit_session_lifecycle();
1166                                let session = self.clone();
1167                                tokio::spawn(async move {
1168                                    if let Err(e) = session.interrupt().await {
1169                                        tracing::debug!("interrupt error: {e}");
1170                                    }
1171                                });
1172                                continue;
1173                            }
1174
1175                            if start.elapsed() >= timeout {
1176                                if idle_for < stall_timeout {
1177                                    let sleep_time = stall_timeout
1178                                        .saturating_sub(idle_for)
1179                                        .min(Duration::from_millis(50));
1180                                    tokio::time::sleep(sleep_time).await;
1181                                    continue;
1182                                }
1183
1184                                runtime_failure_kind = stall_signal.runtime_failure_kind();
1185                                send_status_chunk_best_effort(
1186                                    &tx,
1187                                    stall_signal.status_message(&self.shell_session_id, command),
1188                                )
1189                                .await;
1190
1191                                match stall_signal {
1192                                    StallSignal::None => {
1193                                        if continued_wait_cycles
1194                                            >= MAX_QUIET_WAIT_CYCLES_WITHOUT_SIGNAL
1195                                        {
1196                                            tracing::info!(
1197                                                shell_session_id = %self.shell_session_id,
1198                                                process_id = %process_id,
1199                                                command = %command,
1200                                                timeout_secs,
1201                                                stall_timeout_secs = stall_timeout.as_secs(),
1202                                                "Interrupting quiet PTY command after repeated quiet periods without prompt/error indicators"
1203                                            );
1204                                        } else {
1205                                            continued_wait_anchor_at = Some(Instant::now());
1206                                            continued_wait_cycles += 1;
1207                                            tracing::debug!(
1208                                                shell_session_id = %self.shell_session_id,
1209                                                process_id = %process_id,
1210                                                command = %command,
1211                                                timeout_secs,
1212                                                stall_timeout_secs = stall_timeout.as_secs(),
1213                                                "Long-running PTY command is quiet with no prompt/error indicator; continuing to wait"
1214                                            );
1215                                            continue;
1216                                        }
1217                                    }
1218                                    StallSignal::InteractivePrompt => {
1219                                        tracing::info!(
1220                                            shell_session_id = %self.shell_session_id,
1221                                            process_id = %process_id,
1222                                            command = %command,
1223                                            "Interrupting quiet PTY command because recent output looks interactive"
1224                                        );
1225                                    }
1226                                    StallSignal::ErrorOutput => {
1227                                        tracing::info!(
1228                                            shell_session_id = %self.shell_session_id,
1229                                            process_id = %process_id,
1230                                            command = %command,
1231                                            "Interrupting quiet PTY command because recent output looks like an error"
1232                                        );
1233                                    }
1234                                }
1235
1236                                timed_out = true;
1237                                interrupted = true;
1238                                interrupt_started_at = Some(Instant::now());
1239                                self.set_state(ShellSessionState::Interrupting)?;
1240                                self.emit_session_lifecycle();
1241                                let session = self.clone();
1242                                tokio::spawn(async move {
1243                                    if let Err(e) = session.interrupt().await {
1244                                        tracing::debug!("interrupt error: {e}");
1245                                    }
1246                                });
1247                                continue;
1248                            }
1249                        }
1250
1251                        timed_out = true;
1252                        interrupted = true;
1253                        interrupt_started_at = Some(Instant::now());
1254                        self.set_state(ShellSessionState::Interrupting)?;
1255                        self.emit_session_lifecycle();
1256                        let session = self.clone();
1257                        tokio::spawn(async move {
1258                            if let Err(e) = session.interrupt().await {
1259                                tracing::debug!("interrupt error: {e}");
1260                            }
1261                        });
1262                    }
1263                    Err(_) => {
1264                        self.set_state(ShellSessionState::Failed)?;
1265                        self.emit_session_lifecycle();
1266                        self.terminate().await?;
1267                        let flushed = parser.finish();
1268                        if !flushed.output.is_empty() {
1269                            stdout.push_str(&flushed.output);
1270                            let output_chunk = StreamChunk::ShellOutput {
1271                                process_id: process_id.clone(),
1272                                shell_session_id: Some(self.shell_session_id.clone()),
1273                                stream: ShellOutputStream::Stdout,
1274                                data: flushed.output,
1275                            };
1276                            self.emit_broadcast(output_chunk.clone());
1277                            send_shell_output_chunk_best_effort(&tx, output_chunk).await;
1278                        }
1279
1280                        let duration_ms = start.elapsed().as_millis() as u64;
1281                        return self
1282                            .finish_command(
1283                                &tx,
1284                                command,
1285                                command_cwd,
1286                                CommandCompletion {
1287                                    process_id,
1288                                    stdout,
1289                                    exit_code: 124,
1290                                    process_state: ShellProcessState::Failed,
1291                                    duration_ms,
1292                                    session_state: ShellSessionState::Failed,
1293                                    failure_kind: runtime_failure_kind
1294                                        .or(Some(ShellRuntimeFailureKind::TimedOut)),
1295                                },
1296                            )
1297                            .await;
1298                    }
1299                }
1300            }
1301        }
1302
1303        async fn finish_command(
1304            &self,
1305            tx: &mpsc::Sender<StreamChunk>,
1306            command: &str,
1307            command_cwd: Option<&str>,
1308            completion: CommandCompletion,
1309        ) -> Result<StreamingCommandResult> {
1310            manager().unregister_process(&completion.process_id).await;
1311            self.set_active_sender(None)?;
1312            self.set_active_command(None, None)?;
1313            self.set_state(completion.session_state)?;
1314            if matches!(completion.session_state, ShellSessionState::Idle) && command_cwd.is_some()
1315            {
1316                self.update_working_directory(command_cwd.map(ToOwned::to_owned))?;
1317            }
1318
1319            let lifecycle_chunk = StreamChunk::ShellLifecycle {
1320                process_id: completion.process_id.clone(),
1321                shell_session_id: Some(self.shell_session_id.clone()),
1322                state: completion.process_state,
1323                exit_code: Some(completion.exit_code),
1324                duration_ms: Some(completion.duration_ms),
1325                command: command.to_string(),
1326                cwd: command_cwd.map(ToOwned::to_owned),
1327            };
1328            self.emit_broadcast(lifecycle_chunk.clone());
1329            let _ = tx.send(lifecycle_chunk).await;
1330            self.emit_session_lifecycle_to(tx).await;
1331
1332            Ok(StreamingCommandResult {
1333                process_id: completion.process_id,
1334                command: command.to_string(),
1335                stdout: completion.stdout,
1336                stderr: String::new(),
1337                exit_code: completion.exit_code,
1338                success: matches!(completion.process_state, ShellProcessState::Completed),
1339                duration_ms: completion.duration_ms,
1340                failure_kind: completion.failure_kind,
1341            })
1342        }
1343
1344        async fn request_stop(&self) -> Result<()> {
1345            if self.current_active_process_id().is_none() || self.is_closed() {
1346                return Ok(());
1347            }
1348            self.user_stop_requested.store(true, Ordering::SeqCst);
1349            self.set_state(ShellSessionState::Interrupting)?;
1350            self.emit_session_lifecycle();
1351            if self.interrupt().await.is_err() {
1352                self.set_state(ShellSessionState::Stopping)?;
1353                self.emit_session_lifecycle();
1354                return self.terminate().await;
1355            }
1356
1357            tokio::time::sleep(Duration::from_millis(STOP_ESCALATION_MILLIS)).await;
1358            if self.current_active_process_id().is_some() && !self.is_closed() {
1359                self.set_state(ShellSessionState::Stopping)?;
1360                self.emit_session_lifecycle();
1361                self.terminate().await?;
1362            }
1363            Ok(())
1364        }
1365
1366        async fn interrupt(&self) -> Result<()> {
1367            let mut writer_lock = self.writer.lock().await;
1368            let mut writer = writer_lock.take().ok_or_else(Self::closed_session_error)?;
1369            let (tx, rx) = tokio::sync::oneshot::channel();
1370            std::thread::spawn(move || {
1371                let res = writer.write_all(&[3]).and_then(|_| writer.flush());
1372                let _ = tx.send((res, writer));
1373            });
1374            let (res, writer) = tokio::time::timeout(Duration::from_secs(2), rx)
1375                .await
1376                .map_err(|_| AppError::Session("interrupt write timed out".to_string()))?
1377                .map_err(|_| AppError::Session("interrupt write thread died".to_string()))?;
1378            *writer_lock = Some(writer);
1379            res.map_err(AppError::Io)?;
1380            Ok(())
1381        }
1382
1383        async fn terminate(&self) -> Result<()> {
1384            if self.closed.swap(true, Ordering::SeqCst) {
1385                return Ok(());
1386            }
1387            self.set_active_sender(None)?;
1388
1389            #[cfg(not(windows))]
1390            {
1391                // On Unix, drop the PTY handles first so the blocking reader can observe
1392                // closure promptly.
1393                let mut writer = self.writer.lock().await;
1394                writer.take();
1395                let mut master = self.master.lock().await;
1396                master.take();
1397            }
1398
1399            let child = {
1400                let mut child_guard = self
1401                    .child
1402                    .lock()
1403                    .map_err(|_| AppError::Session("failed to lock PTY child".to_string()))?;
1404                child_guard.take()
1405            };
1406
1407            let Some(mut child) = child else {
1408                #[cfg(windows)]
1409                {
1410                    let mut writer_lock = self.writer.lock().await;
1411                    let writer_to_drop = writer_lock.take();
1412                    let mut master_lock = self.master.lock().await;
1413                    let master_to_drop = master_lock.take();
1414                    std::thread::spawn(move || {
1415                        drop(writer_to_drop);
1416                        drop(master_to_drop);
1417                    });
1418                }
1419                return Ok(());
1420            };
1421
1422            let kill_result = {
1423                #[cfg(windows)]
1424                {
1425                    if let Some(pid) = child.process_id() {
1426                        let _ = tokio::task::spawn_blocking(move || {
1427                            let mut cmd = std::process::Command::new("taskkill");
1428                            cmd.arg("/F")
1429                                .arg("/T")
1430                                .arg("/PID")
1431                                .arg(pid.to_string())
1432                                .stdout(std::process::Stdio::null())
1433                                .stderr(std::process::Stdio::null());
1434                            let _ = cmd.status();
1435                        })
1436                        .await;
1437                    }
1438                }
1439                child.kill()
1440            };
1441
1442            #[cfg(windows)]
1443            {
1444                // On Windows ConPTY, dropping MasterPty calls ClosePseudoConsole which
1445                // blocks synchronously if any child process is still attached.
1446                // We drop them in a detached thread to prevent stalling the async executor
1447                // or preventing Tokio runtime shutdown if ClosePseudoConsole hangs.
1448                let mut writer_lock = self.writer.lock().await;
1449                let writer_to_drop = writer_lock.take();
1450                let mut master_lock = self.master.lock().await;
1451                let master_to_drop = master_lock.take();
1452                std::thread::spawn(move || {
1453                    drop(writer_to_drop);
1454                    drop(master_to_drop);
1455                });
1456            }
1457
1458            let should_observe_exit = match &kill_result {
1459                Ok(()) => true,
1460                Err(error) if error.kind() == ErrorKind::NotFound => true,
1461                Err(_) => false,
1462            };
1463
1464            if should_observe_exit {
1465                for _ in 0..10 {
1466                    match child.try_wait() {
1467                        Ok(Some(_)) => break,
1468                        Err(error) if error.kind() == ErrorKind::NotFound => break,
1469                        Ok(None) => tokio::time::sleep(Duration::from_millis(50)).await,
1470                        Err(error) => {
1471                            tracing::debug!(
1472                                shell_session_id = %self.shell_session_id,
1473                                error = %error,
1474                                "PTY shell try_wait after kill failed"
1475                            );
1476                            break;
1477                        }
1478                    }
1479                }
1480            }
1481
1482            match kill_result {
1483                Ok(()) => Ok(()),
1484                Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
1485                Err(error) => Err(AppError::Io(error)),
1486            }
1487        }
1488    }
1489
1490    fn prune_pool_locked(state: &mut ManagerState, pool_key: &str) {
1491        let Some(session_ids) = state.pools.get(pool_key).cloned() else {
1492            return;
1493        };
1494
1495        let mut kept = Vec::with_capacity(session_ids.len());
1496        for shell_session_id in session_ids {
1497            let keep = state
1498                .sessions
1499                .get(&shell_session_id)
1500                .is_some_and(|session| !session.is_closed());
1501            if keep {
1502                kept.push(shell_session_id);
1503            } else {
1504                state.sessions.remove(&shell_session_id);
1505                state
1506                    .process_index
1507                    .retain(|_, sid| sid != &shell_session_id);
1508            }
1509        }
1510
1511        if kept.is_empty() {
1512            state.pools.remove(pool_key);
1513        } else {
1514            state.pools.insert(pool_key.to_string(), kept);
1515        }
1516    }
1517
1518    fn build_shell_command(mode: SessionMode) -> CommandBuilder {
1519        match mode {
1520            SessionMode::Interactive => CommandBuilder::new_default_prog(),
1521            SessionMode::Automation => automation_shell_builder(),
1522        }
1523    }
1524
1525    fn automation_shell_builder() -> CommandBuilder {
1526        #[cfg(windows)]
1527        {
1528            let mut builder = CommandBuilder::new("cmd.exe");
1529            builder.arg("/Q");
1530            builder
1531        }
1532
1533        #[cfg(not(windows))]
1534        {
1535            // Prefer bash for PTY automation sessions when available.
1536            // Ubuntu typically maps /bin/sh to dash, and that shell has proven
1537            // less reliable for our marker-wrapped PTY command protocol than the
1538            // macOS /bin/sh environment that CI was already passing under.
1539            // Fall back to plain sh so we still work on minimal systems.
1540            if std::path::Path::new("/bin/bash").exists() {
1541                let mut builder = CommandBuilder::new("/bin/bash");
1542                builder.arg("--noprofile");
1543                builder.arg("--norc");
1544                builder
1545            } else {
1546                CommandBuilder::new("sh")
1547            }
1548        }
1549    }
1550
1551    fn prepare_shell(mode: SessionMode, writer: &mut Box<dyn Write + Send>) -> Result<()> {
1552        if matches!(mode, SessionMode::Interactive) {
1553            writer.flush().map_err(AppError::Io)?;
1554            return Ok(());
1555        }
1556
1557        #[cfg(windows)]
1558        {
1559            writer.write_all(b"prompt $G\r\n").map_err(AppError::Io)?;
1560        }
1561
1562        #[cfg(not(windows))]
1563        {
1564            writer
1565                .write_all(b"export PS1=''\nunset PROMPT_COMMAND\nstty -echo\n")
1566                .map_err(AppError::Io)?;
1567        }
1568
1569        writer.flush().map_err(AppError::Io)
1570    }
1571
1572    struct ReaderLoopContext {
1573        active_sender: Arc<StdMutex<Option<mpsc::Sender<String>>>>,
1574        active_process_id: Arc<StdMutex<Option<String>>>,
1575        closed: Arc<AtomicBool>,
1576        claimed_by_user: Arc<AtomicBool>,
1577        event_tx: broadcast::Sender<StreamChunk>,
1578        shell_session_id: String,
1579        emit_raw_output: bool,
1580    }
1581
1582    fn spawn_reader_loop(mut reader: Box<dyn Read + Send>, context: ReaderLoopContext) {
1583        let ReaderLoopContext {
1584            active_sender,
1585            active_process_id,
1586            closed,
1587            claimed_by_user,
1588            event_tx,
1589            shell_session_id,
1590            emit_raw_output,
1591        } = context;
1592
1593        tokio::task::spawn_blocking(move || {
1594            let mut buffer = [0_u8; 4096];
1595            loop {
1596                match reader.read(&mut buffer) {
1597                    Ok(0) => {
1598                        closed.store(true, Ordering::SeqCst);
1599                        let _ = active_sender.lock().map(|mut guard| guard.take());
1600                        let _ = active_process_id.lock().map(|mut guard| guard.take());
1601                        return;
1602                    }
1603                    Ok(read) => {
1604                        let chunk = String::from_utf8_lossy(&buffer[..read]).into_owned();
1605                        let sender = active_sender.lock().ok().and_then(|guard| guard.clone());
1606                        if let Some(sender) = sender {
1607                            let _ = sender.blocking_send(chunk.clone());
1608                        }
1609                        let process_id = active_process_id
1610                            .lock()
1611                            .ok()
1612                            .and_then(|guard| guard.clone());
1613                        let should_emit_raw_output = emit_raw_output
1614                            || (claimed_by_user.load(Ordering::SeqCst) && process_id.is_none());
1615                        if should_emit_raw_output {
1616                            let _ = event_tx.send(StreamChunk::ShellOutput {
1617                                process_id: process_id.unwrap_or_else(|| shell_session_id.clone()),
1618                                shell_session_id: Some(shell_session_id.clone()),
1619                                stream: ShellOutputStream::Stdout,
1620                                data: chunk,
1621                            });
1622                        }
1623                    }
1624                    Err(err) => {
1625                        tracing::error!("PTY reader read failed: {:?}", err);
1626                        println!("PTY reader read failed: {:?}", err);
1627                        closed.store(true, Ordering::SeqCst);
1628                        let _ = active_sender.lock().map(|mut guard| guard.take());
1629                        let _ = active_process_id.lock().map(|mut guard| guard.take());
1630                        return;
1631                    }
1632                }
1633            }
1634        });
1635    }
1636
1637    fn spawn_child_monitor(
1638        child: Arc<StdMutex<Option<Box<dyn Child + Send + Sync>>>>,
1639        closed: Arc<AtomicBool>,
1640        shell_session_id: String,
1641    ) {
1642        tokio::spawn(async move {
1643            loop {
1644                if closed.load(Ordering::SeqCst) {
1645                    return;
1646                }
1647
1648                let exit_result = {
1649                    let mut child = match child.lock() {
1650                        Ok(child) => child,
1651                        Err(_) => {
1652                            closed.store(true, Ordering::SeqCst);
1653                            return;
1654                        }
1655                    };
1656
1657                    let poll_result = match child.as_mut() {
1658                        Some(child) => child.try_wait().map(|status| status.is_some()),
1659                        None => return,
1660                    };
1661
1662                    match poll_result {
1663                        Ok(true) => {
1664                            child.take();
1665                            Some(Ok(()))
1666                        }
1667                        Ok(false) => None,
1668                        Err(error) => {
1669                            child.take();
1670                            Some(Err(error))
1671                        }
1672                    }
1673                };
1674
1675                match exit_result {
1676                    Some(Ok(())) => {
1677                        closed.store(true, Ordering::SeqCst);
1678                        return;
1679                    }
1680                    Some(Err(error)) => {
1681                        tracing::warn!(
1682                            shell_session_id = %shell_session_id,
1683                            error = %error,
1684                            "PTY shell exit poll failed"
1685                        );
1686                        closed.store(true, Ordering::SeqCst);
1687                        return;
1688                    }
1689                    None => tokio::time::sleep(CHILD_EXIT_POLL_INTERVAL).await,
1690                }
1691            }
1692        });
1693    }
1694
1695    impl SessionOutputParser {
1696        fn new(start_marker: String, done_prefix: String) -> Self {
1697            Self {
1698                pending: String::new(),
1699                started: false,
1700                start_marker,
1701                done_prefix,
1702            }
1703        }
1704
1705        fn push(&mut self, chunk: &str) -> ParsedChunk {
1706            self.pending.push_str(chunk);
1707
1708            if !self.started {
1709                if let Some(consume_end) = find_start_marker(&self.pending, &self.start_marker) {
1710                    let rest = self.pending[consume_end..].to_string();
1711                    self.pending = rest;
1712                    self.started = true;
1713                } else {
1714                    trim_to_tail(&mut self.pending, self.start_marker.len() + 8);
1715                    return ParsedChunk {
1716                        output: String::new(),
1717                        exit_code: None,
1718                    };
1719                }
1720            }
1721
1722            if let Some((output_end, consume_end, exit_code)) =
1723                find_done_marker(&self.pending, &self.done_prefix)
1724            {
1725                let output = self.pending[..output_end].to_string();
1726                self.pending = self.pending[consume_end..].to_string();
1727                return ParsedChunk {
1728                    output,
1729                    exit_code: Some(exit_code),
1730                };
1731            }
1732
1733            let keep = self.done_prefix.len() + 32;
1734            if self.pending.len() > keep {
1735                let flush_len = floor_char_boundary(&self.pending, self.pending.len() - keep);
1736                let output = self.pending[..flush_len].to_string();
1737                self.pending = self.pending[flush_len..].to_string();
1738                ParsedChunk {
1739                    output,
1740                    exit_code: None,
1741                }
1742            } else {
1743                ParsedChunk {
1744                    output: String::new(),
1745                    exit_code: None,
1746                }
1747            }
1748        }
1749
1750        fn finish(&mut self) -> ParsedChunk {
1751            if !self.started {
1752                self.pending.clear();
1753                return ParsedChunk {
1754                    output: String::new(),
1755                    exit_code: None,
1756                };
1757            }
1758
1759            if let Some((output_end, _, exit_code)) =
1760                find_done_marker(&self.pending, &self.done_prefix)
1761                    .or_else(|| find_done_marker_allow_eof(&self.pending, &self.done_prefix))
1762            {
1763                let output = self.pending[..output_end].to_string();
1764                self.pending.clear();
1765                return ParsedChunk {
1766                    output,
1767                    exit_code: Some(exit_code),
1768                };
1769            }
1770
1771            ParsedChunk {
1772                output: std::mem::take(&mut self.pending),
1773                exit_code: None,
1774            }
1775        }
1776
1777        fn buffered_output(&self) -> &str {
1778            if self.started {
1779                self.pending.as_str()
1780            } else {
1781                ""
1782            }
1783        }
1784    }
1785
1786    fn append_recent_output_tail(buffer: &mut String, chunk: &str) {
1787        if chunk.is_empty() {
1788            return;
1789        }
1790        buffer.push_str(chunk);
1791        trim_to_tail(buffer, STALL_SIGNAL_TAIL_BYTES);
1792    }
1793
1794    fn inspect_stall_signal(flushed_output_tail: &str, buffered_output_tail: &str) -> StallSignal {
1795        let mut combined = flushed_output_tail.to_string();
1796        append_recent_output_tail(&mut combined, buffered_output_tail);
1797        if combined.trim().is_empty() {
1798            return StallSignal::None;
1799        }
1800
1801        let normalized = combined.to_ascii_lowercase();
1802        if INTERACTIVE_PROMPT_PATTERNS
1803            .iter()
1804            .any(|needle| normalized.contains(needle))
1805        {
1806            return StallSignal::InteractivePrompt;
1807        }
1808
1809        if normalized
1810            .lines()
1811            .map(str::trim)
1812            .any(|line| line.starts_with("error:"))
1813            || ERROR_OUTPUT_PATTERNS
1814                .iter()
1815                .any(|needle| normalized.contains(needle))
1816        {
1817            return StallSignal::ErrorOutput;
1818        }
1819
1820        StallSignal::None
1821    }
1822
1823    fn find_done_marker(buffer: &str, done_prefix: &str) -> Option<(usize, usize, i32)> {
1824        let marker = format!("\n{done_prefix}");
1825        let idx = buffer.find(&marker)?;
1826        let code_start = idx + marker.len();
1827        let line_end_rel = buffer[code_start..].find('\n')?;
1828        let consume_end = code_start + line_end_rel + 1;
1829        let exit_code = buffer[code_start..code_start + line_end_rel]
1830            .trim_end_matches('\r')
1831            .parse()
1832            .ok()?;
1833        let output_end = if idx > 0 && buffer.as_bytes()[idx - 1] == b'\r' {
1834            idx - 1
1835        } else {
1836            idx
1837        };
1838        Some((output_end, consume_end, exit_code))
1839    }
1840
1841    fn find_done_marker_allow_eof(buffer: &str, done_prefix: &str) -> Option<(usize, usize, i32)> {
1842        let marker = format!("\n{done_prefix}");
1843        let idx = buffer.find(&marker)?;
1844        let code_start = idx + marker.len();
1845        let exit_code = buffer[code_start..].trim_end_matches('\r').parse().ok()?;
1846        let output_end = if idx > 0 && buffer.as_bytes()[idx - 1] == b'\r' {
1847            idx - 1
1848        } else {
1849            idx
1850        };
1851        Some((output_end, buffer.len(), exit_code))
1852    }
1853
1854    fn find_start_marker(buffer: &str, start_marker: &str) -> Option<usize> {
1855        let mut search_from = 0;
1856        while let Some(rel_idx) = buffer[search_from..].find(start_marker) {
1857            let idx = search_from + rel_idx;
1858            let before_ok = idx == 0 || buffer.as_bytes()[idx - 1] == b'\n';
1859            if !before_ok {
1860                search_from = idx + start_marker.len();
1861                continue;
1862            }
1863
1864            let after = idx + start_marker.len();
1865            let consumed = if buffer[after..].starts_with("\r\n") {
1866                after + 2
1867            } else if buffer[after..].starts_with('\n') || buffer[after..].starts_with('\r') {
1868                after + 1
1869            } else {
1870                search_from = idx + start_marker.len();
1871                continue;
1872            };
1873
1874            return Some(consumed);
1875        }
1876
1877        None
1878    }
1879
1880    fn trim_to_tail(value: &mut String, keep: usize) {
1881        if value.len() <= keep {
1882            return;
1883        }
1884        let start = ceil_char_boundary(value, value.len() - keep);
1885        *value = value[start..].to_string();
1886    }
1887
1888    fn floor_char_boundary(value: &str, index: usize) -> usize {
1889        let mut index = index.min(value.len());
1890        while index > 0 && !value.is_char_boundary(index) {
1891            index -= 1;
1892        }
1893        index
1894    }
1895
1896    fn ceil_char_boundary(value: &str, index: usize) -> usize {
1897        let mut index = index.min(value.len());
1898        while index < value.len() && !value.is_char_boundary(index) {
1899            index += 1;
1900        }
1901        index
1902    }
1903
1904    fn wrap_command(
1905        command: &str,
1906        command_cwd: Option<&str>,
1907        start_marker: &str,
1908        done_prefix: &str,
1909    ) -> String {
1910        #[cfg(windows)]
1911        {
1912            wrap_command_windows(command, command_cwd, start_marker, done_prefix)
1913        }
1914
1915        #[cfg(not(windows))]
1916        {
1917            wrap_command_posix(command, command_cwd, start_marker, done_prefix)
1918        }
1919    }
1920
1921    #[cfg(not(windows))]
1922    fn wrap_command_posix(
1923        command: &str,
1924        command_cwd: Option<&str>,
1925        start_marker: &str,
1926        done_prefix: &str,
1927    ) -> String {
1928        let mut script = format!("printf '{start_marker}\\n'; _GESTURA_STATUS=0; ");
1929        if let Some(cwd) = command_cwd {
1930            script.push_str("cd ");
1931            script.push_str(&quote_posix(cwd));
1932            script.push_str(" || _GESTURA_STATUS=$?; ");
1933        }
1934        script.push_str("if [ \"$_GESTURA_STATUS\" -eq 0 ]; then eval ");
1935        script.push_str(&quote_posix(command));
1936        script.push_str("; _GESTURA_STATUS=$?; fi; ");
1937        script.push_str(&format!(
1938            "printf '\\n{done_prefix}%s\\n' \"$_GESTURA_STATUS\"; unset _GESTURA_STATUS\n"
1939        ));
1940        script
1941    }
1942
1943    #[cfg(windows)]
1944    fn wrap_command_windows(
1945        command: &str,
1946        command_cwd: Option<&str>,
1947        start_marker: &str,
1948        done_prefix: &str,
1949    ) -> String {
1950        let mut script = format!("echo {start_marker}\r\nset GESTURA_STATUS=0\r\n");
1951        if let Some(cwd) = command_cwd {
1952            script.push_str("cd /d ");
1953            script.push_str(&quote_cmd_arg(cwd));
1954            script.push_str(" || set GESTURA_STATUS=%ERRORLEVEL%\r\n");
1955        }
1956        script.push_str("if \"%GESTURA_STATUS%\"==\"0\" ");
1957        script.push_str(command);
1958        script.push_str("\r\n");
1959        script.push_str("if \"%GESTURA_STATUS%\"==\"0\" set GESTURA_STATUS=%ERRORLEVEL%\r\n");
1960        script.push_str(&format!(
1961            "echo {done_prefix}%GESTURA_STATUS%\r\nset GESTURA_STATUS=\r\n"
1962        ));
1963        script
1964    }
1965
1966    #[cfg(not(windows))]
1967    fn quote_posix(value: &str) -> String {
1968        format!("'{}'", value.replace('\'', "'\\''"))
1969    }
1970
1971    #[cfg(windows)]
1972    fn quote_cmd_arg(value: &str) -> String {
1973        format!("\"{}\"", value.replace('"', "\"\""))
1974    }
1975
1976    #[cfg(test)]
1977    mod tests {
1978        use super::*;
1979
1980        #[cfg(not(target_os = "windows"))]
1981        lazy_static::lazy_static! {
1982            static ref PTY_TEST_SEMAPHORE: tokio::sync::Semaphore = tokio::sync::Semaphore::new(1);
1983        }
1984
1985        #[cfg(not(target_os = "windows"))]
1986        const PTY_TEST_TIMEOUT_SECS: u64 = 30;
1987        #[cfg(not(target_os = "windows"))]
1988        const PTY_EVENT_TIMEOUT_SECS: u64 = 20;
1989
1990        #[cfg(not(target_os = "windows"))]
1991        async fn shutdown_session_for_test(pool_key: &str) {
1992            let _ = tokio::time::timeout(Duration::from_secs(2), shutdown_session(pool_key)).await;
1993        }
1994
1995        #[cfg(not(target_os = "windows"))]
1996        async fn run_pty_test<F>(pool_key: &'static str, future: F)
1997        where
1998            F: std::future::Future<Output = ()> + Send + 'static,
1999        {
2000            let _permit = PTY_TEST_SEMAPHORE.acquire().await.unwrap();
2001            shutdown_session_for_test(pool_key).await;
2002            let mut handle = tokio::spawn(future);
2003            tokio::select! {
2004                result = &mut handle => {
2005                    shutdown_session_for_test(pool_key).await;
2006                    match result {
2007                        Ok(()) => {}
2008                        Err(error) if error.is_panic() => std::panic::resume_unwind(error.into_panic()),
2009                        Err(error) => panic!("PTY test task for pool '{pool_key}' failed to join: {error}"),
2010                    }
2011                }
2012                _ = tokio::time::sleep(Duration::from_secs(PTY_TEST_TIMEOUT_SECS)) => {
2013                    handle.abort();
2014                    let _ = handle.await;
2015                    shutdown_session_for_test(pool_key).await;
2016                    panic!(
2017                        "PTY test timed out after {}s for pool '{pool_key}'",
2018                        PTY_TEST_TIMEOUT_SECS
2019                    );
2020                }
2021            }
2022        }
2023
2024        #[cfg(not(target_os = "windows"))]
2025        async fn recv_session_lifecycle(
2026            rx: &mut mpsc::Receiver<StreamChunk>,
2027        ) -> (String, ShellSessionState, bool, bool) {
2028            loop {
2029                if let StreamChunk::ShellSessionLifecycle {
2030                    shell_session_id,
2031                    state,
2032                    interactive,
2033                    user_managed,
2034                    ..
2035                } = tokio::time::timeout(Duration::from_secs(PTY_EVENT_TIMEOUT_SECS), rx.recv())
2036                    .await
2037                    .expect("timed out waiting for shell event")
2038                    .expect("channel closed while waiting for shell event")
2039                {
2040                    return (shell_session_id, state, interactive, user_managed);
2041                }
2042            }
2043        }
2044
2045        #[cfg(not(target_os = "windows"))]
2046        async fn recv_command_started(rx: &mut mpsc::Receiver<StreamChunk>) -> (String, String) {
2047            loop {
2048                if let StreamChunk::ShellLifecycle {
2049                    process_id,
2050                    shell_session_id,
2051                    state: ShellProcessState::Started,
2052                    ..
2053                } = tokio::time::timeout(Duration::from_secs(PTY_EVENT_TIMEOUT_SECS), rx.recv())
2054                    .await
2055                    .expect("timed out waiting for command start")
2056                    .expect("channel closed while waiting for command start")
2057                {
2058                    return (
2059                        process_id,
2060                        shell_session_id.expect("PTY-managed commands should carry session id"),
2061                    );
2062                }
2063            }
2064        }
2065
2066        #[cfg(not(target_os = "windows"))]
2067        fn spawn_chunk_collector(
2068            mut rx: mpsc::Receiver<StreamChunk>,
2069        ) -> tokio::task::JoinHandle<Vec<StreamChunk>> {
2070            tokio::spawn(async move {
2071                let mut chunks = Vec::new();
2072                while let Some(chunk) = rx.recv().await {
2073                    chunks.push(chunk);
2074                }
2075                chunks
2076            })
2077        }
2078
2079        #[cfg(not(target_os = "windows"))]
2080        fn simple_output_command(text: &str) -> String {
2081            format!("printf {text}")
2082        }
2083
2084        #[cfg(not(target_os = "windows"))]
2085        fn delayed_output_command(text: &str) -> String {
2086            format!("sleep 2; printf {text}")
2087        }
2088
2089        #[cfg(not(target_os = "windows"))]
2090        fn interactive_input_command(text: &str) -> String {
2091            format!("printf {text}\n")
2092        }
2093
2094        #[test]
2095        fn parser_extracts_output_and_exit_code() {
2096            let mut parser = SessionOutputParser::new(
2097                "__GESTURA_START_abc__".to_string(),
2098                "__GESTURA_DONE_abc__:".to_string(),
2099            );
2100
2101            let first =
2102                parser.push("printf '__GESTURA_START_abc__\\n'\r\n__GESTURA_START_abc__\r\nhello");
2103            assert_eq!(first.output, "");
2104            assert_eq!(first.exit_code, None);
2105
2106            let second = parser.push(" world\n__GESTURA_DONE_abc__:0\r\n");
2107            assert_eq!(second.output, "hello world");
2108            assert_eq!(second.exit_code, Some(0));
2109        }
2110
2111        #[test]
2112        fn parser_finish_extracts_trailing_done_marker_after_channel_close() {
2113            let mut parser = SessionOutputParser::new(
2114                "__GESTURA_START_abc__".to_string(),
2115                "__GESTURA_DONE_abc__:".to_string(),
2116            );
2117
2118            let first = parser.push(
2119                "printf '__GESTURA_START_abc__\\n'\r\n__GESTURA_START_abc__\r\nhello world\n__GESTURA_DONE_abc__:0\r",
2120            );
2121            assert_eq!(first.output, "");
2122            assert_eq!(first.exit_code, None);
2123
2124            let finished = parser.finish();
2125            assert_eq!(finished.output, "hello world");
2126            assert_eq!(finished.exit_code, Some(0));
2127        }
2128
2129        #[test]
2130        #[cfg(not(windows))]
2131        fn wrap_command_changes_directory_before_execution() {
2132            let script = wrap_command(
2133                "pwd",
2134                Some("/tmp/example dir"),
2135                "__GESTURA_START__",
2136                "__GESTURA_DONE__:",
2137            );
2138
2139            assert!(script.contains("cd '/tmp/example dir'"));
2140            assert!(script.contains("printf '__GESTURA_START__\\n'"));
2141            assert!(script.contains("printf '\\n__GESTURA_DONE__:%s\\n'"));
2142        }
2143
2144        #[cfg(not(target_os = "windows"))]
2145        #[tokio::test]
2146        async fn create_session_starts_idle_shell_without_command() {
2147            run_pty_test("pty-create-session", async move {
2148                let (tx, mut rx) = mpsc::channel(64);
2149                let handle = create_session(
2150                    "pty-create-session",
2151                    std::env::current_dir()
2152                        .ok()
2153                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2154                        .as_deref(),
2155                    Some(tx),
2156                )
2157                .await
2158                .expect("create PTY session");
2159
2160                let (shell_session_id, state, interactive, user_managed) =
2161                    recv_session_lifecycle(&mut rx).await;
2162                assert_eq!(shell_session_id, handle.shell_session_id);
2163                assert_eq!(state, ShellSessionState::Idle);
2164                assert!(interactive);
2165                assert!(user_managed);
2166
2167                stop_session(&handle.shell_session_id)
2168                    .await
2169                    .expect("stop PTY session");
2170            })
2171            .await;
2172        }
2173
2174        #[cfg(not(target_os = "windows"))]
2175        #[tokio::test]
2176        async fn reuses_idle_session_within_same_pool() {
2177            run_pty_test("pty-reuse-pool", async move {
2178                let (tx, rx) = mpsc::channel(128);
2179                let collector = spawn_chunk_collector(rx);
2180                let first_command = simple_output_command("first");
2181                let first = execute_in_session(
2182                    "pty-reuse-pool",
2183                    std::env::current_dir()
2184                        .ok()
2185                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2186                        .as_deref(),
2187                    first_command.as_str(),
2188                    None,
2189                    Some(10),
2190                    tx.clone(),
2191                )
2192                .await
2193                .expect("first command result");
2194                assert!(first.stdout.contains("first"));
2195
2196                let second_command = simple_output_command("second");
2197                let second = execute_in_session(
2198                    "pty-reuse-pool",
2199                    std::env::current_dir()
2200                        .ok()
2201                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2202                        .as_deref(),
2203                    second_command.as_str(),
2204                    None,
2205                    Some(10),
2206                    tx.clone(),
2207                )
2208                .await
2209                .expect("second command result");
2210                assert!(second.stdout.contains("second"));
2211
2212                shutdown_session("pty-reuse-pool")
2213                    .await
2214                    .expect("shutdown PTY session pool");
2215
2216                drop(tx);
2217                let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2218                    .await
2219                    .expect("timed out collecting reuse-pool shell chunks")
2220                    .expect("reuse-pool collector should join");
2221
2222                let started_session_ids = chunks
2223                    .iter()
2224                    .filter_map(|chunk| match chunk {
2225                        StreamChunk::ShellLifecycle {
2226                            shell_session_id: Some(shell_session_id),
2227                            state: ShellProcessState::Started,
2228                            ..
2229                        } => Some(shell_session_id.clone()),
2230                        _ => None,
2231                    })
2232                    .collect::<Vec<_>>();
2233
2234                assert!(
2235                    started_session_ids.len() >= 2,
2236                    "expected at least two started shell lifecycle events, got {chunks:?}"
2237                );
2238                assert_eq!(started_session_ids[0], started_session_ids[1]);
2239            })
2240            .await;
2241        }
2242
2243        #[cfg(not(target_os = "windows"))]
2244        #[tokio::test]
2245        async fn reused_session_emits_busy_before_started_for_follow_up_command() {
2246            run_pty_test("pty-reuse-order-pool", async move {
2247                let (tx, rx) = mpsc::channel(128);
2248                let collector = spawn_chunk_collector(rx);
2249
2250                let first_command = simple_output_command("first");
2251                execute_in_session(
2252                    "pty-reuse-order-pool",
2253                    std::env::current_dir()
2254                        .ok()
2255                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2256                        .as_deref(),
2257                    first_command.as_str(),
2258                    None,
2259                    Some(10),
2260                    tx.clone(),
2261                )
2262                .await
2263                .expect("first command result");
2264
2265                let second_command = simple_output_command("second");
2266                execute_in_session(
2267                    "pty-reuse-order-pool",
2268                    std::env::current_dir()
2269                        .ok()
2270                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2271                        .as_deref(),
2272                    second_command.as_str(),
2273                    None,
2274                    Some(10),
2275                    tx.clone(),
2276                )
2277                .await
2278                .expect("second command result");
2279
2280                shutdown_session("pty-reuse-order-pool")
2281                    .await
2282                    .expect("shutdown PTY reuse-order session pool");
2283
2284                drop(tx);
2285                let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2286                    .await
2287                    .expect("timed out collecting reuse-order shell chunks")
2288                    .expect("reuse-order collector should join");
2289
2290                let reused_session_id = chunks
2291                    .iter()
2292                    .find_map(|chunk| match chunk {
2293                        StreamChunk::ShellLifecycle {
2294                            shell_session_id: Some(shell_session_id),
2295                            state: ShellProcessState::Started,
2296                            ..
2297                        } => Some(shell_session_id.clone()),
2298                        _ => None,
2299                    })
2300                    .expect("expected started shell lifecycle event for reused session");
2301
2302                let busy_indices = chunks
2303                    .iter()
2304                    .enumerate()
2305                    .filter_map(|(index, chunk)| match chunk {
2306                        StreamChunk::ShellSessionLifecycle {
2307                            shell_session_id,
2308                            state: ShellSessionState::Busy,
2309                            interactive: false,
2310                            user_managed: false,
2311                            ..
2312                        } if shell_session_id == &reused_session_id => Some(index),
2313                        _ => None,
2314                    })
2315                    .collect::<Vec<_>>();
2316
2317                let started_indices = chunks
2318                    .iter()
2319                    .enumerate()
2320                    .filter_map(|(index, chunk)| match chunk {
2321                        StreamChunk::ShellLifecycle {
2322                            shell_session_id: Some(shell_session_id),
2323                            state: ShellProcessState::Started,
2324                            ..
2325                        } if shell_session_id == &reused_session_id => Some(index),
2326                        _ => None,
2327                    })
2328                    .collect::<Vec<_>>();
2329
2330                assert!(
2331                    busy_indices.len() >= 2,
2332                    "expected two busy shell session lifecycle events for reused session, got {chunks:?}"
2333                );
2334                assert!(
2335                    started_indices.len() >= 2,
2336                    "expected two started shell lifecycle events for reused session, got {chunks:?}"
2337                );
2338                assert!(busy_indices[1] < started_indices[1]);
2339            })
2340            .await;
2341        }
2342
2343        #[cfg(not(target_os = "windows"))]
2344        #[tokio::test]
2345        async fn execute_in_session_emits_session_lifecycle_to_caller_stream() {
2346            run_pty_test("pty-session-lifecycle-stream", async move {
2347                let (tx, rx) = mpsc::channel(128);
2348                let collector = spawn_chunk_collector(rx);
2349                let command = simple_output_command("streamed");
2350                let result = execute_in_session(
2351                    "pty-session-lifecycle-stream",
2352                    std::env::current_dir()
2353                        .ok()
2354                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2355                        .as_deref(),
2356                    command.as_str(),
2357                    None,
2358                    Some(10),
2359                    tx.clone(),
2360                )
2361                .await
2362                .expect("command result");
2363                assert!(result.stdout.contains("streamed"));
2364
2365                shutdown_session("pty-session-lifecycle-stream")
2366                    .await
2367                    .expect("shutdown PTY session pool");
2368
2369                drop(tx);
2370                let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2371                    .await
2372                    .expect("timed out collecting lifecycle stream chunks")
2373                    .expect("lifecycle collector should join");
2374
2375                let (busy_index, busy_session_id) = chunks
2376                    .iter()
2377                    .enumerate()
2378                    .find_map(|(index, chunk)| match chunk {
2379                        StreamChunk::ShellSessionLifecycle {
2380                            shell_session_id,
2381                            state: ShellSessionState::Busy,
2382                            interactive: false,
2383                            user_managed: false,
2384                            ..
2385                        } => Some((index, shell_session_id.clone())),
2386                        _ => None,
2387                    })
2388                    .expect("expected busy shell session lifecycle event");
2389
2390                let started_index = chunks
2391                    .iter()
2392                    .enumerate()
2393                    .find_map(|(index, chunk)| match chunk {
2394                        StreamChunk::ShellLifecycle {
2395                            shell_session_id: Some(shell_session_id),
2396                            state: ShellProcessState::Started,
2397                            ..
2398                        } if shell_session_id == &busy_session_id => Some(index),
2399                        _ => None,
2400                    })
2401                    .expect("expected started shell lifecycle event for busy session");
2402
2403                let idle_index = chunks
2404                    .iter()
2405                    .enumerate()
2406                    .find_map(|(index, chunk)| match chunk {
2407                        StreamChunk::ShellSessionLifecycle {
2408                            shell_session_id,
2409                            state: ShellSessionState::Idle,
2410                            interactive: false,
2411                            user_managed: false,
2412                            ..
2413                        } if shell_session_id == &busy_session_id => Some(index),
2414                        _ => None,
2415                    })
2416                    .expect("expected idle shell session lifecycle event for busy session");
2417
2418                assert!(busy_index < started_index);
2419                assert!(started_index < idle_index);
2420            })
2421            .await;
2422        }
2423
2424        #[cfg(not(target_os = "windows"))]
2425        #[tokio::test]
2426        async fn allocates_new_session_when_existing_one_is_busy() {
2427            run_pty_test("pty-busy-pool", async move {
2428                let (tx_one, mut rx_one) = mpsc::channel(128);
2429                let (tx_two, mut rx_two) = mpsc::channel(128);
2430                let first_command = delayed_output_command("one");
2431
2432                let first = tokio::spawn(async move {
2433                    execute_in_session(
2434                        "pty-busy-pool",
2435                        std::env::current_dir()
2436                            .ok()
2437                            .and_then(|p| p.to_str().map(ToOwned::to_owned))
2438                            .as_deref(),
2439                        first_command.as_str(),
2440                        None,
2441                        Some(10),
2442                        tx_one,
2443                    )
2444                    .await
2445                });
2446
2447                let (_, first_session_id) = recv_command_started(&mut rx_one).await;
2448                let second_command = simple_output_command("two");
2449
2450                let second = tokio::spawn(async move {
2451                    execute_in_session(
2452                        "pty-busy-pool",
2453                        std::env::current_dir()
2454                            .ok()
2455                            .and_then(|p| p.to_str().map(ToOwned::to_owned))
2456                            .as_deref(),
2457                        second_command.as_str(),
2458                        None,
2459                        Some(10),
2460                        tx_two,
2461                    )
2462                    .await
2463                });
2464
2465                let (_, second_session_id) = recv_command_started(&mut rx_two).await;
2466                assert_ne!(first_session_id, second_session_id);
2467
2468                first.await.expect("first join").expect("first result");
2469                second.await.expect("second join").expect("second result");
2470
2471                shutdown_session("pty-busy-pool")
2472                    .await
2473                    .expect("shutdown PTY session pool");
2474            })
2475            .await;
2476        }
2477
2478        #[cfg(not(target_os = "windows"))]
2479        #[tokio::test]
2480        async fn activity_aware_execution_allows_recently_active_long_running_commands() {
2481            run_pty_test("pty-activity-aware-pool", async move {
2482                let (tx, _rx) = mpsc::channel(128);
2483                let command =
2484                    "printf warmup && sleep 2 && printf progress && sleep 2 && printf done";
2485
2486                let result = execute_in_session_with_options(
2487                    "pty-activity-aware-pool",
2488                    std::env::current_dir()
2489                        .ok()
2490                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2491                        .as_deref(),
2492                    command,
2493                    None,
2494                    ShellExecutionOptions {
2495                        timeout_secs: Some(1),
2496                        allow_long_running: true,
2497                        stall_timeout_secs: Some(8),
2498                    },
2499                    tx,
2500                )
2501                .await
2502                .expect("activity-aware PTY command result");
2503
2504                assert!(result.success);
2505                assert!(result.stdout.contains("done"));
2506
2507                shutdown_session("pty-activity-aware-pool")
2508                    .await
2509                    .expect("shutdown activity-aware PTY session pool");
2510            })
2511            .await;
2512        }
2513
2514        #[cfg(not(target_os = "windows"))]
2515        #[tokio::test]
2516        async fn activity_aware_execution_reports_continue_wait_before_timing_out_when_quiet_output_has_no_indicator()
2517         {
2518            run_pty_test("pty-quiet-activity-aware-pool", async move {
2519                let (tx, mut rx) = mpsc::channel(128);
2520                let command = "sleep 4 && printf done";
2521
2522                let result = execute_in_session_with_options(
2523                    "pty-quiet-activity-aware-pool",
2524                    std::env::current_dir()
2525                        .ok()
2526                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2527                        .as_deref(),
2528                    command,
2529                    None,
2530                    ShellExecutionOptions {
2531                        timeout_secs: Some(1),
2532                        allow_long_running: true,
2533                        stall_timeout_secs: Some(1),
2534                    },
2535                    tx,
2536                )
2537                .await
2538                .expect("quiet activity-aware PTY command result");
2539
2540                shutdown_session_for_test("pty-quiet-activity-aware-pool").await;
2541
2542                assert!(!result.success);
2543                assert_eq!(result.exit_code, 124);
2544                assert_eq!(result.failure_kind, Some(ShellRuntimeFailureKind::TimedOut));
2545
2546                let mut saw_continue_wait_status = false;
2547                while let Ok(chunk) = rx.try_recv() {
2548                    if let StreamChunk::Status { message } = chunk
2549                        && message.contains("saw no prompt or error indicator")
2550                        && message.contains("will continue waiting")
2551                    {
2552                        saw_continue_wait_status = true;
2553                        break;
2554                    }
2555                }
2556                assert!(saw_continue_wait_status);
2557            })
2558            .await;
2559        }
2560
2561        #[cfg(not(target_os = "windows"))]
2562        #[tokio::test]
2563        async fn activity_aware_execution_times_out_after_repeated_quiet_periods_without_signal() {
2564            run_pty_test("pty-quiet-timeout-pool", async move {
2565                let (tx, _rx) = mpsc::channel(128);
2566                let command = "sleep 4 && printf done";
2567
2568                let result = tokio::time::timeout(
2569                    Duration::from_secs(8),
2570                    execute_in_session_with_options(
2571                        "pty-quiet-timeout-pool",
2572                        std::env::current_dir()
2573                            .ok()
2574                            .and_then(|p| p.to_str().map(ToOwned::to_owned))
2575                            .as_deref(),
2576                        command,
2577                        None,
2578                        ShellExecutionOptions {
2579                            timeout_secs: Some(1),
2580                            allow_long_running: true,
2581                            stall_timeout_secs: Some(1),
2582                        },
2583                        tx,
2584                    ),
2585                )
2586                .await
2587                .expect("quiet PTY command should not hang")
2588                .expect("quiet PTY timeout result");
2589
2590                shutdown_session_for_test("pty-quiet-timeout-pool").await;
2591
2592                assert!(!result.success);
2593                assert_eq!(result.exit_code, 124);
2594                assert_eq!(result.failure_kind, Some(ShellRuntimeFailureKind::TimedOut));
2595            })
2596            .await;
2597        }
2598
2599        #[test]
2600        fn error_output_stalls_use_the_shorter_early_signal_timeout() {
2601            assert_eq!(
2602                early_signal_stall_timeout(Duration::from_secs(30), StallSignal::ErrorOutput),
2603                Some(Duration::from_secs(EARLY_SIGNAL_STALL_TIMEOUT_SECS))
2604            );
2605            assert_eq!(
2606                early_signal_stall_timeout(Duration::from_secs(5), StallSignal::ErrorOutput),
2607                Some(Duration::from_secs(5))
2608            );
2609        }
2610
2611        #[test]
2612        fn generic_quiet_stalls_do_not_use_the_early_signal_timeout() {
2613            assert_eq!(
2614                early_signal_stall_timeout(Duration::from_secs(30), StallSignal::None),
2615                None
2616            );
2617        }
2618
2619        #[test]
2620        fn stall_signal_maps_interactive_prompt_to_waiting_for_input_failure_kind() {
2621            assert_eq!(
2622                StallSignal::InteractivePrompt.runtime_failure_kind(),
2623                Some(ShellRuntimeFailureKind::WaitingForInput)
2624            );
2625            assert!(
2626                StallSignal::InteractivePrompt
2627                    .status_message("shell-123", "pnpm add vite")
2628                    .contains("waiting_for_input")
2629            );
2630        }
2631
2632        #[test]
2633        fn stall_signal_detects_interactive_prompt_in_buffered_output() {
2634            assert_eq!(
2635                inspect_stall_signal(
2636                    "",
2637                    "Need to install the following packages:\ncreate-app\nProceed? (y/n)"
2638                ),
2639                StallSignal::InteractivePrompt
2640            );
2641        }
2642
2643        #[test]
2644        fn stall_signal_detects_recent_error_output() {
2645            assert_eq!(
2646                inspect_stall_signal(
2647                    "Compiling dependencies\n",
2648                    "error: no such file or directory"
2649                ),
2650                StallSignal::ErrorOutput
2651            );
2652        }
2653
2654        #[test]
2655        fn stall_signal_returns_none_when_output_has_no_clear_indicator() {
2656            assert_eq!(
2657                inspect_stall_signal("Compiling 24 crates\n", "Still working on build graph"),
2658                StallSignal::None
2659            );
2660        }
2661
2662        #[cfg(not(target_os = "windows"))]
2663        #[tokio::test]
2664        async fn claiming_automation_session_removes_it_from_reuse_pool() {
2665            run_pty_test("pty-claim-pool", async move {
2666                let (tx, mut rx) = mpsc::channel(128);
2667                let first_command = simple_output_command("first");
2668                let first = execute_in_session(
2669                    "pty-claim-pool",
2670                    std::env::current_dir()
2671                        .ok()
2672                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2673                        .as_deref(),
2674                    first_command.as_str(),
2675                    None,
2676                    Some(10),
2677                    tx.clone(),
2678                )
2679                .await
2680                .expect("first command result");
2681                assert!(first.stdout.contains("first"));
2682
2683                let (_, first_session_id) = recv_command_started(&mut rx).await;
2684
2685                claim_session(&first_session_id)
2686                    .await
2687                    .expect("claim session")
2688                    .expect("claimed metadata");
2689
2690                let metadata = describe_session(&first_session_id)
2691                    .await
2692                    .expect("describe claimed session")
2693                    .expect("session metadata");
2694                assert!(metadata.user_managed);
2695                assert!(!metadata.available_for_reuse);
2696
2697                let second_command = simple_output_command("second");
2698                let second = execute_in_session(
2699                    "pty-claim-pool",
2700                    std::env::current_dir()
2701                        .ok()
2702                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2703                        .as_deref(),
2704                    second_command.as_str(),
2705                    None,
2706                    Some(10),
2707                    tx.clone(),
2708                )
2709                .await
2710                .expect("second command result");
2711                assert!(second.stdout.contains("second"));
2712
2713                let (_, second_session_id) = recv_command_started(&mut rx).await;
2714                assert_ne!(first_session_id, second_session_id);
2715
2716                shutdown_session("pty-claim-pool")
2717                    .await
2718                    .expect("shutdown PTY session pool");
2719            })
2720            .await;
2721        }
2722
2723        #[cfg(not(target_os = "windows"))]
2724        #[tokio::test]
2725        async fn claimed_automation_session_streams_interactive_output_after_attach() {
2726            run_pty_test("pty-attach-pool", async move {
2727                let (tx, mut rx) = mpsc::channel(128);
2728                let seed_command = simple_output_command("base");
2729                execute_in_session(
2730                    "pty-attach-pool",
2731                    std::env::current_dir()
2732                        .ok()
2733                        .and_then(|p| p.to_str().map(ToOwned::to_owned))
2734                        .as_deref(),
2735                    seed_command.as_str(),
2736                    None,
2737                    Some(10),
2738                    tx,
2739                )
2740                .await
2741                .expect("seed command result");
2742
2743                let (_, shell_session_id) = recv_command_started(&mut rx).await;
2744
2745                let (attach_tx, mut attach_rx) = mpsc::channel(128);
2746                subscribe_session(&shell_session_id, attach_tx)
2747                    .await
2748                    .expect("subscribe to shell session")
2749                    .expect("session metadata after subscribe");
2750
2751                claim_session(&shell_session_id)
2752                    .await
2753                    .expect("claim session")
2754                    .expect("claimed session metadata");
2755
2756                let attached_input = interactive_input_command("attached");
2757                send_input(&shell_session_id, attached_input.as_str())
2758                    .await
2759                    .expect("send interactive input");
2760
2761                let mut saw_attached_output = false;
2762                let deadline =
2763                    tokio::time::Instant::now() + Duration::from_secs(PTY_EVENT_TIMEOUT_SECS);
2764                while tokio::time::Instant::now() < deadline {
2765                    let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2766                    let chunk = tokio::time::timeout(remaining, attach_rx.recv())
2767                        .await
2768                        .expect("timed out waiting for attached shell output")
2769                        .expect("attach channel closed unexpectedly");
2770
2771                    if let StreamChunk::ShellOutput { data, .. } = chunk
2772                        && data.contains("attached")
2773                    {
2774                        saw_attached_output = true;
2775                        break;
2776                    }
2777                }
2778
2779                assert!(
2780                    saw_attached_output,
2781                    "claimed automation session did not stream attached shell output"
2782                );
2783
2784                shutdown_session("pty-attach-pool")
2785                    .await
2786                    .expect("shutdown attached PTY session pool");
2787            })
2788            .await;
2789        }
2790    }
2791}
2792
2793/// Start a long-lived interactive PTY shell session immediately.
2794pub async fn create_session(
2795    pool_key: &str,
2796    initial_cwd: Option<&str>,
2797    tx: Option<mpsc::Sender<StreamChunk>>,
2798) -> Result<ShellSessionHandle> {
2799    imp::create_session(pool_key, initial_cwd, tx).await
2800}
2801
2802/// Execute a shell command inside a reusable PTY-backed session.
2803pub async fn execute_in_session(
2804    pool_key: &str,
2805    initial_cwd: Option<&str>,
2806    command: &str,
2807    command_cwd: Option<&str>,
2808    timeout_secs: Option<u64>,
2809    tx: mpsc::Sender<StreamChunk>,
2810) -> Result<StreamingCommandResult> {
2811    execute_in_session_with_options(
2812        pool_key,
2813        initial_cwd,
2814        command,
2815        command_cwd,
2816        ShellExecutionOptions {
2817            timeout_secs,
2818            ..ShellExecutionOptions::default()
2819        },
2820        tx,
2821    )
2822    .await
2823}
2824
2825/// Execute a shell command inside a reusable PTY-backed session with an explicit policy.
2826pub async fn execute_in_session_with_options(
2827    pool_key: &str,
2828    initial_cwd: Option<&str>,
2829    command: &str,
2830    command_cwd: Option<&str>,
2831    options: ShellExecutionOptions,
2832    tx: mpsc::Sender<StreamChunk>,
2833) -> Result<StreamingCommandResult> {
2834    imp::execute_in_session_with_options(pool_key, initial_cwd, command, command_cwd, options, tx)
2835        .await
2836}
2837
2838/// Stop an active PTY-managed command by its command run id.
2839pub async fn stop_process(process_id: &str) -> Result<Option<ShellSessionHandle>> {
2840    imp::stop_process(process_id).await
2841}
2842
2843/// Stop and remove a long-lived PTY shell session.
2844pub async fn stop_session(shell_session_id: &str) -> Result<Option<ShellSessionHandle>> {
2845    imp::stop_session(shell_session_id).await
2846}
2847
2848/// Shut down all PTY-backed shell sessions associated with a pool key.
2849pub async fn shutdown_session(pool_key: &str) -> Result<()> {
2850    imp::shutdown_session(pool_key).await
2851}
2852
2853/// Send raw input bytes to a PTY shell session.
2854pub async fn send_input(shell_session_id: &str, data: &str) -> Result<()> {
2855    imp::send_input(shell_session_id, data).await
2856}
2857
2858/// Resize a PTY shell session to the given column and row dimensions.
2859pub async fn resize_session(shell_session_id: &str, cols: u16, rows: u16) -> Result<()> {
2860    imp::resize_session(shell_session_id, cols, rows).await
2861}
2862
2863/// Describe a PTY shell session for frontend/UI enrichment.
2864pub async fn describe_session(shell_session_id: &str) -> Result<Option<ShellSessionMetadata>> {
2865    imp::describe_session(shell_session_id).await
2866}
2867
2868/// Claim a PTY shell session for direct user management.
2869pub async fn claim_session(shell_session_id: &str) -> Result<Option<ShellSessionMetadata>> {
2870    imp::claim_session(shell_session_id).await
2871}
2872
2873/// Subscribe to lifecycle/output events for an existing PTY shell session.
2874pub async fn subscribe_session(
2875    shell_session_id: &str,
2876    tx: mpsc::Sender<StreamChunk>,
2877) -> Result<Option<ShellSessionMetadata>> {
2878    imp::subscribe_session(shell_session_id, tx).await
2879}