1use crate::error::{AppError, Result};
8use crate::streaming::{ShellOutputStream, ShellProcessState, ShellSessionState, StreamChunk};
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc;
11
12use super::shell_streaming::{ShellRuntimeFailureKind, StreamingCommandResult};
13
14#[derive(Debug, Clone)]
16pub struct ShellSessionHandle {
17 pub shell_session_id: String,
19 pub cwd: Option<String>,
21}
22
23#[derive(Debug, Clone)]
25pub struct ShellSessionMetadata {
26 pub shell_session_id: String,
28 pub cwd: Option<String>,
30 pub interactive: bool,
32 pub user_managed: bool,
34 pub available_for_reuse: bool,
36}
37
38#[derive(Debug, Clone, Copy, Default)]
40pub struct ShellExecutionOptions {
41 pub timeout_secs: Option<u64>,
43 pub allow_long_running: bool,
45 pub stall_timeout_secs: Option<u64>,
47}
48
49mod imp {
50 use super::*;
51 use portable_pty::{Child, CommandBuilder, MasterPty, PtySize, native_pty_system};
52 use std::collections::HashMap;
53 use std::io::{ErrorKind, Read, Write};
54 use std::sync::{
55 Arc, Mutex as StdMutex, OnceLock,
56 atomic::{AtomicBool, Ordering},
57 };
58 use tokio::sync::{Mutex, broadcast, mpsc};
59
60 const DEFAULT_ROWS: u16 = 24;
61 const DEFAULT_COLS: u16 = 120;
62 const INTERRUPT_GRACE_SECS: u64 = 2;
63 const STOP_ESCALATION_MILLIS: u64 = 300;
64 const ACTIVE_CHUNK_BUFFER: usize = 256;
65 const SESSION_EVENT_BUFFER: usize = 512;
66 const DEFAULT_EXECUTION_TIMEOUT_SECS: u64 = 300;
67 const MIN_STALL_TIMEOUT_SECS: u64 = 30;
68 const MAX_STALL_TIMEOUT_SECS: u64 = 300;
69 const EARLY_SIGNAL_STALL_TIMEOUT_SECS: u64 = 15;
70 const MAX_QUIET_WAIT_CYCLES_WITHOUT_SIGNAL: u8 = 2;
71 const SHELL_OUTPUT_SEND_TIMEOUT: Duration = Duration::from_millis(100);
72 const STATUS_CHUNK_SEND_TIMEOUT: Duration = Duration::from_millis(100);
73 const STALL_SIGNAL_TAIL_BYTES: usize = 4096;
74 const CHILD_EXIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
75
76 const INTERACTIVE_PROMPT_PATTERNS: &[&str] = &[
77 "ok to proceed?",
78 "need to install the following packages",
79 "would you like to continue",
80 "press enter to continue",
81 "press any key to continue",
82 "select an option",
83 "(y/n)",
84 "[y/n]",
85 "yes/no",
86 "enter password",
87 "enter passphrase",
88 "password:",
89 "passphrase:",
90 ];
91
92 const ERROR_OUTPUT_PATTERNS: &[&str] = &[
93 "command not found",
94 "no such file or directory",
95 "permission denied",
96 "not recognized as an internal or external command",
97 "is not recognized as an internal or external command",
98 "npm err!",
99 "traceback (most recent call last)",
100 "syntax error",
101 "fatal:",
102 "panic:",
103 "exception:",
104 ];
105
106 async fn send_shell_output_chunk_best_effort(
107 tx: &mpsc::Sender<StreamChunk>,
108 chunk: StreamChunk,
109 ) {
110 match tokio::time::timeout(SHELL_OUTPUT_SEND_TIMEOUT, tx.send(chunk)).await {
111 Ok(Ok(())) | Ok(Err(_)) => {}
112 Err(_) => {
113 tracing::debug!(
114 timeout_ms = SHELL_OUTPUT_SEND_TIMEOUT.as_millis(),
115 "Dropping PTY shell output chunk because the stream receiver is not draining fast enough"
116 );
117 }
118 }
119 }
120
121 async fn send_status_chunk_best_effort(tx: &mpsc::Sender<StreamChunk>, message: String) {
122 match tokio::time::timeout(
123 STATUS_CHUNK_SEND_TIMEOUT,
124 tx.send(StreamChunk::Status { message }),
125 )
126 .await
127 {
128 Ok(Ok(())) | Ok(Err(_)) => {}
129 Err(_) => {
130 tracing::debug!(
131 timeout_ms = STATUS_CHUNK_SEND_TIMEOUT.as_millis(),
132 "Dropping PTY status chunk because the stream receiver is not draining fast enough"
133 );
134 }
135 }
136 }
137
138 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
139 enum SessionMode {
140 Automation,
141 Interactive,
142 }
143
144 #[derive(Default)]
145 struct ManagerState {
146 sessions: HashMap<String, Arc<ShellSession>>,
147 pools: HashMap<String, Vec<String>>,
148 process_index: HashMap<String, String>,
149 }
150
151 pub(super) struct ShellSessionManager {
152 state: Mutex<ManagerState>,
153 }
154
155 struct ShellSession {
156 shell_session_id: String,
157 pool_key: String,
158 mode: SessionMode,
159 master: Mutex<Option<Box<dyn MasterPty + Send>>>,
160 writer: Mutex<Option<Box<dyn Write + Send>>>,
161 command_lock: Mutex<()>,
162 active_sender: Arc<StdMutex<Option<mpsc::Sender<String>>>>,
163 event_tx: broadcast::Sender<StreamChunk>,
164 child: Arc<StdMutex<Option<Box<dyn Child + Send + Sync>>>>,
165 closed: Arc<AtomicBool>,
166 claimed_by_user: Arc<AtomicBool>,
167 user_stop_requested: Arc<AtomicBool>,
168 state: Arc<StdMutex<ShellSessionState>>,
169 working_directory: Arc<StdMutex<Option<String>>>,
170 active_process_id: Arc<StdMutex<Option<String>>>,
171 active_command: Arc<StdMutex<Option<String>>>,
172 }
173
174 struct ParsedChunk {
175 output: String,
176 exit_code: Option<i32>,
177 }
178
179 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
180 enum StallSignal {
181 None,
182 InteractivePrompt,
183 ErrorOutput,
184 }
185
186 impl StallSignal {
187 fn runtime_failure_kind(self) -> Option<ShellRuntimeFailureKind> {
188 match self {
189 Self::None => None,
190 Self::InteractivePrompt => Some(ShellRuntimeFailureKind::WaitingForInput),
191 Self::ErrorOutput => Some(ShellRuntimeFailureKind::ErrorOutput),
192 }
193 }
194
195 fn status_message(self, shell_session_id: &str, command: &str) -> String {
196 match self {
197 Self::None => format!(
198 "Shell runtime status: session `{shell_session_id}` saw no prompt or error indicator during a quiet period for `{command}` and will continue waiting."
199 ),
200 Self::InteractivePrompt => format!(
201 "Shell runtime status: session `{shell_session_id}` classified quiet command `{command}` as waiting_for_input and is interrupting it."
202 ),
203 Self::ErrorOutput => format!(
204 "Shell runtime status: session `{shell_session_id}` classified quiet command `{command}` as error_output and is interrupting it."
205 ),
206 }
207 }
208 }
209
210 struct CommandCompletion {
211 process_id: String,
212 stdout: String,
213 exit_code: i32,
214 process_state: ShellProcessState,
215 duration_ms: u64,
216 session_state: ShellSessionState,
217 failure_kind: Option<ShellRuntimeFailureKind>,
218 }
219
220 struct SessionOutputParser {
221 pending: String,
222 started: bool,
223 start_marker: String,
224 done_prefix: String,
225 }
226
227 static MANAGER: OnceLock<ShellSessionManager> = OnceLock::new();
228
229 fn manager() -> &'static ShellSessionManager {
230 MANAGER.get_or_init(|| ShellSessionManager {
231 state: Mutex::new(ManagerState::default()),
232 })
233 }
234
235 pub(super) async fn create_session(
236 pool_key: &str,
237 initial_cwd: Option<&str>,
238 tx: Option<mpsc::Sender<StreamChunk>>,
239 ) -> Result<ShellSessionHandle> {
240 let session = spawn_session(pool_key, initial_cwd, SessionMode::Interactive).await?;
241 manager().insert_session(session.clone()).await;
242 session.set_state(ShellSessionState::Idle)?;
243 if let Some(tx) = tx {
244 session.subscribe(tx);
245 }
246 session.emit_session_lifecycle();
247 Ok(session.handle())
248 }
249
250 #[allow(dead_code)]
251 pub(super) async fn execute_in_session(
252 pool_key: &str,
253 initial_cwd: Option<&str>,
254 command: &str,
255 command_cwd: Option<&str>,
256 timeout_secs: Option<u64>,
257 tx: mpsc::Sender<StreamChunk>,
258 ) -> Result<StreamingCommandResult> {
259 execute_in_session_with_options(
260 pool_key,
261 initial_cwd,
262 command,
263 command_cwd,
264 ShellExecutionOptions {
265 timeout_secs,
266 ..ShellExecutionOptions::default()
267 },
268 tx,
269 )
270 .await
271 }
272
273 pub(super) async fn execute_in_session_with_options(
274 pool_key: &str,
275 initial_cwd: Option<&str>,
276 command: &str,
277 command_cwd: Option<&str>,
278 options: ShellExecutionOptions,
279 tx: mpsc::Sender<StreamChunk>,
280 ) -> Result<StreamingCommandResult> {
281 let session = acquire_execution_session(pool_key, initial_cwd).await?;
282 session.execute(command, command_cwd, options, tx).await
283 }
284
285 fn default_stall_timeout_secs(timeout_secs: u64) -> u64 {
286 timeout_secs.clamp(MIN_STALL_TIMEOUT_SECS, MAX_STALL_TIMEOUT_SECS)
287 }
288
289 fn early_signal_stall_timeout(
290 stall_timeout: Duration,
291 stall_signal: StallSignal,
292 ) -> Option<Duration> {
293 match stall_signal {
294 StallSignal::InteractivePrompt | StallSignal::ErrorOutput => {
295 Some(stall_timeout.min(Duration::from_secs(EARLY_SIGNAL_STALL_TIMEOUT_SECS)))
296 }
297 StallSignal::None => None,
298 }
299 }
300
301 pub(super) async fn stop_process(process_id: &str) -> Result<Option<ShellSessionHandle>> {
302 let session = manager().find_session_by_process(process_id).await;
303 if let Some(session) = session {
304 session.request_stop().await?;
305 return Ok(Some(session.handle()));
306 }
307 Ok(None)
308 }
309
310 pub(super) async fn stop_session(shell_session_id: &str) -> Result<Option<ShellSessionHandle>> {
311 let session = manager().remove_session(shell_session_id).await;
312 if let Some(session) = session {
313 session.user_stop_requested.store(true, Ordering::SeqCst);
314 session.set_state(ShellSessionState::Stopping)?;
315 session.emit_session_lifecycle();
316 session.terminate().await?;
317 session.set_active_command(None, None)?;
318 session.set_state(ShellSessionState::Stopped)?;
319 session.emit_session_lifecycle();
320 return Ok(Some(session.handle()));
321 }
322 Ok(None)
323 }
324
325 pub(super) async fn shutdown_session(pool_key: &str) -> Result<()> {
326 let sessions = manager().remove_pool(pool_key).await;
327 for session in sessions {
328 session.user_stop_requested.store(true, Ordering::SeqCst);
329 session.set_state(ShellSessionState::Stopping)?;
330 session.emit_session_lifecycle();
331 session.terminate().await?;
332 session.set_active_command(None, None)?;
333 session.set_state(ShellSessionState::Stopped)?;
334 session.emit_session_lifecycle();
335 }
336 Ok(())
337 }
338
339 pub(super) async fn send_input(shell_session_id: &str, data: &str) -> Result<()> {
340 let Some(session) = manager().find_session(shell_session_id).await else {
341 return Err(AppError::Session(format!(
342 "unknown shell session: {shell_session_id}"
343 )));
344 };
345 session.send_input(data).await
346 }
347
348 pub(super) async fn resize_session(shell_session_id: &str, cols: u16, rows: u16) -> Result<()> {
349 let Some(session) = manager().find_session(shell_session_id).await else {
350 return Err(AppError::Session(format!(
351 "unknown shell session: {shell_session_id}"
352 )));
353 };
354 session.resize(cols, rows).await
355 }
356
357 pub(super) async fn describe_session(
358 shell_session_id: &str,
359 ) -> Result<Option<ShellSessionMetadata>> {
360 Ok(manager()
361 .find_session(shell_session_id)
362 .await
363 .map(|session| session.metadata()))
364 }
365
366 pub(super) async fn claim_session(
367 shell_session_id: &str,
368 ) -> Result<Option<ShellSessionMetadata>> {
369 let Some(session) = manager().find_session(shell_session_id).await else {
370 return Ok(None);
371 };
372
373 session.claim_for_user();
374 Ok(Some(session.metadata()))
375 }
376
377 pub(super) async fn subscribe_session(
378 shell_session_id: &str,
379 tx: mpsc::Sender<StreamChunk>,
380 ) -> Result<Option<ShellSessionMetadata>> {
381 let Some(session) = manager().find_session(shell_session_id).await else {
382 return Ok(None);
383 };
384
385 session.subscribe(tx);
386 Ok(Some(session.metadata()))
387 }
388
389 async fn spawn_session(
390 pool_key: &str,
391 initial_cwd: Option<&str>,
392 mode: SessionMode,
393 ) -> Result<Arc<ShellSession>> {
394 let pool_key_owned = pool_key.to_string();
395 let initial_cwd_owned = initial_cwd.map(ToOwned::to_owned);
396 tokio::task::spawn_blocking(move || {
397 ShellSession::spawn(&pool_key_owned, initial_cwd_owned.as_deref(), mode)
398 })
399 .await
400 .map_err(|error| AppError::Session(format!("failed to spawn PTY session task: {error}")))?
401 }
402
403 async fn acquire_execution_session(
404 pool_key: &str,
405 initial_cwd: Option<&str>,
406 ) -> Result<Arc<ShellSession>> {
407 if let Some(session) = manager().acquire_idle_session(pool_key).await? {
408 return Ok(session);
409 }
410
411 let session = spawn_session(pool_key, initial_cwd, SessionMode::Automation).await?;
412 manager().insert_session(session.clone()).await;
413 session.set_state(ShellSessionState::Idle)?;
414 session.mark_busy_if_idle()?;
415 Ok(session)
416 }
417
418 impl ShellSessionManager {
419 async fn insert_session(&self, session: Arc<ShellSession>) {
420 let mut state = self.state.lock().await;
421 let shell_session_id = session.shell_session_id.clone();
422 state
423 .pools
424 .entry(session.pool_key.clone())
425 .or_default()
426 .push(shell_session_id.clone());
427 state.sessions.insert(shell_session_id, session);
428 }
429
430 async fn acquire_idle_session(&self, pool_key: &str) -> Result<Option<Arc<ShellSession>>> {
431 let mut state = self.state.lock().await;
432 prune_pool_locked(&mut state, pool_key);
433 let session_ids = state.pools.get(pool_key).cloned().unwrap_or_default();
434 for shell_session_id in session_ids {
435 if let Some(session) = state.sessions.get(&shell_session_id).cloned()
436 && session.mark_busy_if_idle()?
437 {
438 return Ok(Some(session));
439 }
440 }
441 Ok(None)
442 }
443
444 async fn register_process(&self, process_id: String, shell_session_id: String) {
445 let mut state = self.state.lock().await;
446 state.process_index.insert(process_id, shell_session_id);
447 }
448
449 async fn unregister_process(&self, process_id: &str) {
450 let mut state = self.state.lock().await;
451 state.process_index.remove(process_id);
452 }
453
454 async fn find_session(&self, shell_session_id: &str) -> Option<Arc<ShellSession>> {
455 let state = self.state.lock().await;
456 state.sessions.get(shell_session_id).cloned()
457 }
458
459 async fn find_session_by_process(&self, process_id: &str) -> Option<Arc<ShellSession>> {
460 let state = self.state.lock().await;
461 let shell_session_id = state.process_index.get(process_id)?.clone();
462 state.sessions.get(&shell_session_id).cloned()
463 }
464
465 async fn remove_session(&self, shell_session_id: &str) -> Option<Arc<ShellSession>> {
466 let mut state = self.state.lock().await;
467 let session = state.sessions.remove(shell_session_id)?;
468 state.process_index.retain(|_, sid| sid != shell_session_id);
469 if let Some(pool) = state.pools.get_mut(&session.pool_key) {
470 pool.retain(|sid| sid != shell_session_id);
471 if pool.is_empty() {
472 state.pools.remove(&session.pool_key);
473 }
474 }
475 Some(session)
476 }
477
478 async fn remove_pool(&self, pool_key: &str) -> Vec<Arc<ShellSession>> {
479 let mut state = self.state.lock().await;
480 let session_ids = state.pools.remove(pool_key).unwrap_or_default();
481 let mut removed = Vec::with_capacity(session_ids.len());
482 for shell_session_id in session_ids {
483 if let Some(session) = state.sessions.remove(&shell_session_id) {
484 state
485 .process_index
486 .retain(|_, sid| sid != &shell_session_id);
487 removed.push(session);
488 }
489 }
490 removed
491 }
492 }
493
494 impl ShellSession {
495 fn closed_session_error() -> AppError {
496 AppError::Session(
497 "PTY shell session closed unexpectedly; retry to create a fresh shell".to_string(),
498 )
499 }
500
501 fn spawn(
502 pool_key: &str,
503 initial_cwd: Option<&str>,
504 mode: SessionMode,
505 ) -> Result<Arc<Self>> {
506 let pty_system = native_pty_system();
507 let pair = pty_system
508 .openpty(PtySize {
509 rows: DEFAULT_ROWS,
510 cols: DEFAULT_COLS,
511 pixel_width: 0,
512 pixel_height: 0,
513 })
514 .map_err(|error| AppError::Session(format!("failed to open PTY: {error:#}")))?;
515
516 let mut builder = build_shell_command(mode);
517 if let Some(dir) = initial_cwd {
518 builder.cwd(dir);
519 }
520 builder.env("TERM", "xterm-256color");
521 builder.env("GESTURA_PTY", "1");
522
523 let child = pair.slave.spawn_command(builder).map_err(|error| {
524 AppError::Session(format!("failed to spawn PTY shell: {error:#}"))
525 })?;
526
527 let reader = pair.master.try_clone_reader().map_err(|error| {
528 AppError::Session(format!("failed to clone PTY reader: {error:#}"))
529 })?;
530 let mut writer = pair.master.take_writer().map_err(|error| {
531 AppError::Session(format!("failed to take PTY writer: {error:#}"))
532 })?;
533
534 prepare_shell(mode, &mut writer)?;
535
536 let shell_session_id = format!("shell-{}", uuid::Uuid::new_v4());
537 let active_sender = Arc::new(StdMutex::new(None));
538 let active_process_id = Arc::new(StdMutex::new(None));
539 let closed = Arc::new(AtomicBool::new(false));
540 let claimed_by_user = Arc::new(AtomicBool::new(false));
541 let child = Arc::new(StdMutex::new(Some(child)));
542 let (event_tx, _) = broadcast::channel(SESSION_EVENT_BUFFER);
543
544 spawn_reader_loop(
545 reader,
546 ReaderLoopContext {
547 active_sender: active_sender.clone(),
548 active_process_id: active_process_id.clone(),
549 closed: closed.clone(),
550 claimed_by_user: claimed_by_user.clone(),
551 event_tx: event_tx.clone(),
552 shell_session_id: shell_session_id.clone(),
553 emit_raw_output: matches!(mode, SessionMode::Interactive),
554 },
555 );
556 spawn_child_monitor(child.clone(), closed.clone(), shell_session_id.clone());
557
558 Ok(Arc::new(Self {
559 shell_session_id,
560 pool_key: pool_key.to_string(),
561 mode,
562 master: Mutex::new(Some(pair.master)),
563 writer: Mutex::new(Some(writer)),
564 command_lock: Mutex::new(()),
565 active_sender,
566 event_tx,
567 child,
568 closed,
569 claimed_by_user,
570 user_stop_requested: Arc::new(AtomicBool::new(false)),
571 state: Arc::new(StdMutex::new(ShellSessionState::Starting)),
572 working_directory: Arc::new(StdMutex::new(initial_cwd.map(ToOwned::to_owned))),
573 active_process_id,
574 active_command: Arc::new(StdMutex::new(None)),
575 }))
576 }
577
578 fn handle(&self) -> ShellSessionHandle {
579 ShellSessionHandle {
580 shell_session_id: self.shell_session_id.clone(),
581 cwd: self.current_working_directory(),
582 }
583 }
584
585 fn metadata(&self) -> ShellSessionMetadata {
586 ShellSessionMetadata {
587 shell_session_id: self.shell_session_id.clone(),
588 cwd: self.current_working_directory(),
589 interactive: true,
590 user_managed: self.is_user_managed(),
591 available_for_reuse: self.is_available_for_reuse(),
592 }
593 }
594
595 fn is_interactive(&self) -> bool {
596 matches!(self.mode, SessionMode::Interactive)
597 }
598
599 fn is_user_managed(&self) -> bool {
600 self.is_interactive() || self.claimed_by_user.load(Ordering::SeqCst)
601 }
602
603 fn is_closed(&self) -> bool {
604 self.closed.load(Ordering::SeqCst)
605 }
606
607 fn current_working_directory(&self) -> Option<String> {
608 self.working_directory
609 .lock()
610 .ok()
611 .and_then(|guard| guard.clone())
612 }
613
614 fn current_active_process_id(&self) -> Option<String> {
615 self.active_process_id
616 .lock()
617 .ok()
618 .and_then(|guard| guard.clone())
619 }
620
621 fn current_active_command(&self) -> Option<String> {
622 self.active_command
623 .lock()
624 .ok()
625 .and_then(|guard| guard.clone())
626 }
627
628 fn state_value(&self) -> ShellSessionState {
629 self.state
630 .lock()
631 .ok()
632 .map(|guard| *guard)
633 .unwrap_or(ShellSessionState::Failed)
634 }
635
636 fn is_available_for_reuse(&self) -> bool {
637 matches!(self.mode, SessionMode::Automation)
638 && !self.is_user_managed()
639 && matches!(self.state_value(), ShellSessionState::Idle)
640 && !self.is_closed()
641 }
642
643 fn set_state(&self, state: ShellSessionState) -> Result<()> {
644 let mut guard = self
645 .state
646 .lock()
647 .map_err(|_| AppError::Session("failed to lock shell session state".to_string()))?;
648 *guard = state;
649 Ok(())
650 }
651
652 fn mark_busy_if_idle(&self) -> Result<bool> {
653 if self.is_closed()
654 || !matches!(self.mode, SessionMode::Automation)
655 || self.is_user_managed()
656 {
657 return Ok(false);
658 }
659 let mut guard = self
660 .state
661 .lock()
662 .map_err(|_| AppError::Session("failed to lock shell session state".to_string()))?;
663 if !matches!(*guard, ShellSessionState::Idle) {
664 return Ok(false);
665 }
666 *guard = ShellSessionState::Busy;
667 Ok(true)
668 }
669
670 fn claim_for_user(&self) {
671 if self.is_interactive() {
672 return;
673 }
674
675 if self
676 .claimed_by_user
677 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
678 .is_ok()
679 {
680 self.emit_session_lifecycle();
681 }
682 }
683
684 fn set_active_command(
685 &self,
686 process_id: Option<String>,
687 command: Option<String>,
688 ) -> Result<()> {
689 let mut process_guard = self.active_process_id.lock().map_err(|_| {
690 AppError::Session("failed to lock active shell process id".to_string())
691 })?;
692 *process_guard = process_id;
693
694 let mut command_guard = self.active_command.lock().map_err(|_| {
695 AppError::Session("failed to lock active shell command".to_string())
696 })?;
697 *command_guard = command;
698 Ok(())
699 }
700
701 fn set_active_sender(&self, sender: Option<mpsc::Sender<String>>) -> Result<()> {
702 let mut guard = self.active_sender.lock().map_err(|_| {
703 AppError::Session("failed to lock active PTY command sender".to_string())
704 })?;
705 *guard = sender;
706 Ok(())
707 }
708
709 fn update_working_directory(&self, cwd: Option<String>) -> Result<()> {
710 let mut guard = self.working_directory.lock().map_err(|_| {
711 AppError::Session("failed to lock shell session working directory".to_string())
712 })?;
713 *guard = cwd;
714 Ok(())
715 }
716
717 fn subscribe(&self, tx: mpsc::Sender<StreamChunk>) {
718 let mut rx = self.event_tx.subscribe();
719 tokio::spawn(async move {
720 loop {
721 match rx.recv().await {
722 Ok(chunk) => {
723 if tx.send(chunk).await.is_err() {
724 return;
725 }
726 }
727 Err(broadcast::error::RecvError::Lagged(_)) => continue,
728 Err(broadcast::error::RecvError::Closed) => return,
729 }
730 }
731 });
732 }
733
734 fn emit_broadcast(&self, chunk: StreamChunk) {
735 let _ = self.event_tx.send(chunk);
736 }
737
738 fn session_lifecycle_chunk(&self) -> StreamChunk {
739 StreamChunk::ShellSessionLifecycle {
740 shell_session_id: self.shell_session_id.clone(),
741 state: self.state_value(),
742 cwd: self.current_working_directory(),
743 active_process_id: self.current_active_process_id(),
744 active_command: self.current_active_command(),
745 available_for_reuse: self.is_available_for_reuse(),
746 interactive: self.is_interactive(),
747 user_managed: self.is_user_managed(),
748 }
749 }
750
751 fn emit_session_lifecycle(&self) {
752 self.emit_broadcast(self.session_lifecycle_chunk());
753 }
754
755 async fn emit_session_lifecycle_to(&self, tx: &mpsc::Sender<StreamChunk>) {
756 let chunk = self.session_lifecycle_chunk();
757 self.emit_broadcast(chunk.clone());
758 let _ = tx.send(chunk).await;
759 }
760
761 async fn send_input(&self, data: &str) -> Result<()> {
762 if self.is_closed() {
763 return Err(Self::closed_session_error());
764 }
765
766 self.claim_for_user();
767
768 if data.is_empty() {
769 return Ok(());
770 }
771
772 let mut writer = self.writer.lock().await;
773 let writer = writer.as_mut().ok_or_else(Self::closed_session_error)?;
774 writer.write_all(data.as_bytes()).map_err(AppError::Io)?;
775 writer.flush().map_err(AppError::Io)
776 }
777
778 async fn resize(&self, cols: u16, rows: u16) -> Result<()> {
779 let cols = cols.max(1);
780 let rows = rows.max(1);
781 let master = self.master.lock().await;
782 let master = master.as_ref().ok_or_else(Self::closed_session_error)?;
783 master
784 .resize(PtySize {
785 rows,
786 cols,
787 pixel_width: 0,
788 pixel_height: 0,
789 })
790 .map_err(|error| AppError::Session(format!("failed to resize PTY: {error:#}")))
791 }
792
793 async fn execute(
794 self: Arc<Self>,
795 command: &str,
796 command_cwd: Option<&str>,
797 options: ShellExecutionOptions,
798 tx: mpsc::Sender<StreamChunk>,
799 ) -> Result<StreamingCommandResult> {
800 let _command_guard = self.command_lock.lock().await;
801 if self.is_closed() {
802 return Err(Self::closed_session_error());
803 }
804
805 let process_id = uuid::Uuid::new_v4().to_string();
806 let start_marker = format!("__GESTURA_START_{process_id}__");
807 let done_prefix = format!("__GESTURA_DONE_{process_id}__:");
808 let wrapped = wrap_command(command, command_cwd, &start_marker, &done_prefix);
809 let timeout_secs = options
810 .timeout_secs
811 .unwrap_or(DEFAULT_EXECUTION_TIMEOUT_SECS);
812 let timeout = Duration::from_secs(timeout_secs);
813 let stall_timeout = Duration::from_secs(
814 options
815 .stall_timeout_secs
816 .unwrap_or_else(|| default_stall_timeout_secs(timeout_secs)),
817 );
818 let start = Instant::now();
819
820 let (chunk_tx, mut chunk_rx) = mpsc::channel::<String>(ACTIVE_CHUNK_BUFFER);
821 self.user_stop_requested.store(false, Ordering::SeqCst);
822 self.set_active_sender(Some(chunk_tx))?;
823 self.set_active_command(Some(process_id.clone()), Some(command.to_string()))?;
824 self.set_state(ShellSessionState::Busy)?;
825 manager()
826 .register_process(process_id.clone(), self.shell_session_id.clone())
827 .await;
828 self.emit_session_lifecycle_to(&tx).await;
829
830 {
831 let mut writer = self.writer.lock().await;
832 let writer = writer.as_mut().ok_or_else(Self::closed_session_error)?;
833 if let Err(error) = writer.write_all(wrapped.as_bytes()) {
834 self.set_state(ShellSessionState::Failed)?;
835 self.set_active_sender(None)?;
836 self.set_active_command(None, None)?;
837 manager().unregister_process(&process_id).await;
838 self.emit_session_lifecycle_to(&tx).await;
839 return Err(AppError::Io(error));
840 }
841 writer.flush().map_err(AppError::Io)?;
842 }
843
844 let start_chunk = StreamChunk::ShellLifecycle {
845 process_id: process_id.clone(),
846 shell_session_id: Some(self.shell_session_id.clone()),
847 state: ShellProcessState::Started,
848 exit_code: None,
849 duration_ms: None,
850 command: command.to_string(),
851 cwd: command_cwd.map(ToOwned::to_owned),
852 };
853 self.emit_broadcast(start_chunk.clone());
854 let _ = tx.send(start_chunk).await;
855
856 let mut parser = SessionOutputParser::new(start_marker, done_prefix);
857 let mut stdout = String::new();
858 let mut timed_out = false;
859 let mut interrupted = false;
860 let mut interrupt_started_at: Option<Instant> = None;
861 let mut last_activity_at: Option<Instant> = None;
862 let mut continued_wait_anchor_at: Option<Instant> = None;
863 let mut continued_wait_cycles = 0_u8;
864 let mut recent_output_tail = String::new();
865 let mut runtime_failure_kind: Option<ShellRuntimeFailureKind> = None;
866
867 loop {
868 if interrupted
869 && interrupt_started_at
870 .expect("interrupt timestamp must be set")
871 .elapsed()
872 >= Duration::from_secs(INTERRUPT_GRACE_SECS)
873 {
874 self.set_state(ShellSessionState::Failed)?;
875 self.emit_session_lifecycle();
876 self.terminate().await?;
877 let flushed = parser.finish();
878 if !flushed.output.is_empty() {
879 stdout.push_str(&flushed.output);
880 let output_chunk = StreamChunk::ShellOutput {
881 process_id: process_id.clone(),
882 shell_session_id: Some(self.shell_session_id.clone()),
883 stream: ShellOutputStream::Stdout,
884 data: flushed.output,
885 };
886 self.emit_broadcast(output_chunk.clone());
887 send_shell_output_chunk_best_effort(&tx, output_chunk).await;
888 }
889
890 let duration_ms = start.elapsed().as_millis() as u64;
891 return self
892 .finish_command(
893 &tx,
894 command,
895 command_cwd,
896 CommandCompletion {
897 process_id,
898 stdout,
899 exit_code: 124,
900 process_state: ShellProcessState::Failed,
901 duration_ms,
902 session_state: ShellSessionState::Failed,
903 failure_kind: runtime_failure_kind
904 .or(Some(ShellRuntimeFailureKind::TimedOut)),
905 },
906 )
907 .await;
908 }
909
910 let wait_for = if interrupted {
911 Duration::from_secs(INTERRUPT_GRACE_SECS)
912 .checked_sub(
913 interrupt_started_at
914 .expect("interrupt timestamp must be set")
915 .elapsed(),
916 )
917 .unwrap_or(Duration::from_secs(0))
918 } else if options.allow_long_running && start.elapsed() >= timeout {
919 let idle_for = last_activity_at
920 .or(continued_wait_anchor_at)
921 .map(|timestamp| timestamp.elapsed())
922 .unwrap_or_else(|| start.elapsed());
923 stall_timeout
924 .checked_sub(idle_for)
925 .unwrap_or(Duration::from_secs(0))
926 } else if options.allow_long_running {
927 let idle_for = last_activity_at
928 .or(continued_wait_anchor_at)
929 .map(|timestamp| timestamp.elapsed())
930 .unwrap_or_else(|| start.elapsed());
931 let stall_signal =
932 inspect_stall_signal(&recent_output_tail, parser.buffered_output());
933 if let Some(signal_timeout) =
934 early_signal_stall_timeout(stall_timeout, stall_signal)
935 {
936 timeout
937 .checked_sub(start.elapsed())
938 .unwrap_or(Duration::from_secs(0))
939 .min(
940 signal_timeout
941 .checked_sub(idle_for)
942 .unwrap_or(Duration::from_secs(0)),
943 )
944 } else {
945 timeout
946 .checked_sub(start.elapsed())
947 .unwrap_or(Duration::from_secs(0))
948 }
949 } else {
950 timeout
951 .checked_sub(start.elapsed())
952 .unwrap_or(Duration::from_secs(0))
953 };
954
955 match tokio::time::timeout(wait_for, chunk_rx.recv()).await {
956 Ok(Some(chunk)) => {
957 last_activity_at = Some(Instant::now());
958 continued_wait_anchor_at = None;
959 continued_wait_cycles = 0;
960 let parsed = parser.push(&chunk);
961 if !parsed.output.is_empty() {
962 stdout.push_str(&parsed.output);
963 append_recent_output_tail(&mut recent_output_tail, &parsed.output);
964 let output_chunk = StreamChunk::ShellOutput {
965 process_id: process_id.clone(),
966 shell_session_id: Some(self.shell_session_id.clone()),
967 stream: ShellOutputStream::Stdout,
968 data: parsed.output,
969 };
970 self.emit_broadcast(output_chunk.clone());
971 send_shell_output_chunk_best_effort(&tx, output_chunk).await;
972 }
973
974 if let Some(exit_code) = parsed.exit_code {
975 let user_stopped =
976 self.user_stop_requested.swap(false, Ordering::SeqCst);
977 let duration_ms = start.elapsed().as_millis() as u64;
978 let process_state = if timed_out {
979 ShellProcessState::Failed
980 } else if user_stopped {
981 ShellProcessState::Stopped
982 } else if exit_code == 0 {
983 ShellProcessState::Completed
984 } else {
985 ShellProcessState::Failed
986 };
987 let reported_exit = if timed_out {
988 124
989 } else if user_stopped && exit_code == 0 {
990 130
991 } else {
992 exit_code
993 };
994 let session_state = if self.is_closed() {
995 if user_stopped {
996 ShellSessionState::Stopped
997 } else {
998 ShellSessionState::Failed
999 }
1000 } else {
1001 ShellSessionState::Idle
1002 };
1003
1004 return self
1005 .finish_command(
1006 &tx,
1007 command,
1008 command_cwd,
1009 CommandCompletion {
1010 process_id,
1011 stdout,
1012 exit_code: reported_exit,
1013 process_state,
1014 duration_ms,
1015 session_state,
1016 failure_kind: if timed_out {
1017 runtime_failure_kind
1018 .or(Some(ShellRuntimeFailureKind::TimedOut))
1019 } else {
1020 runtime_failure_kind
1021 },
1022 },
1023 )
1024 .await;
1025 }
1026 }
1027 Ok(None) => {
1028 let flushed = parser.finish();
1029 if !flushed.output.is_empty() {
1030 stdout.push_str(&flushed.output);
1031 let output_chunk = StreamChunk::ShellOutput {
1032 process_id: process_id.clone(),
1033 shell_session_id: Some(self.shell_session_id.clone()),
1034 stream: ShellOutputStream::Stdout,
1035 data: flushed.output.clone(),
1036 };
1037 self.emit_broadcast(output_chunk.clone());
1038 send_shell_output_chunk_best_effort(&tx, output_chunk).await;
1039 }
1040
1041 if let Some(exit_code) = flushed.exit_code {
1042 let user_stopped =
1043 self.user_stop_requested.swap(false, Ordering::SeqCst);
1044 let duration_ms = start.elapsed().as_millis() as u64;
1045 let process_state = if user_stopped {
1046 ShellProcessState::Stopped
1047 } else if exit_code == 0 {
1048 ShellProcessState::Completed
1049 } else {
1050 ShellProcessState::Failed
1051 };
1052 let reported_exit = if user_stopped && exit_code == 0 {
1053 130
1054 } else {
1055 exit_code
1056 };
1057 let session_state = if user_stopped {
1058 ShellSessionState::Stopped
1059 } else if self.is_closed() {
1060 ShellSessionState::Failed
1061 } else {
1062 ShellSessionState::Idle
1063 };
1064
1065 return self
1066 .finish_command(
1067 &tx,
1068 command,
1069 command_cwd,
1070 CommandCompletion {
1071 process_id,
1072 stdout,
1073 exit_code: reported_exit,
1074 process_state,
1075 duration_ms,
1076 session_state,
1077 failure_kind: runtime_failure_kind,
1078 },
1079 )
1080 .await;
1081 }
1082
1083 let user_stopped = self.user_stop_requested.swap(false, Ordering::SeqCst);
1084 let duration_ms = start.elapsed().as_millis() as u64;
1085 let exit_code = if user_stopped { 130 } else { -1 };
1086 let process_state = if user_stopped {
1087 ShellProcessState::Stopped
1088 } else {
1089 ShellProcessState::Failed
1090 };
1091 let session_state = if user_stopped {
1092 ShellSessionState::Stopped
1093 } else {
1094 ShellSessionState::Failed
1095 };
1096
1097 return self
1098 .finish_command(
1099 &tx,
1100 command,
1101 command_cwd,
1102 CommandCompletion {
1103 process_id,
1104 stdout,
1105 exit_code,
1106 process_state,
1107 duration_ms,
1108 session_state,
1109 failure_kind: if timed_out {
1110 runtime_failure_kind
1111 .or(Some(ShellRuntimeFailureKind::TimedOut))
1112 } else {
1113 runtime_failure_kind
1114 },
1115 },
1116 )
1117 .await;
1118 }
1119 Err(_) if !interrupted => {
1120 if options.allow_long_running {
1121 let idle_for = last_activity_at
1122 .or(continued_wait_anchor_at)
1123 .map(|timestamp| timestamp.elapsed())
1124 .unwrap_or_else(|| start.elapsed());
1125 let stall_signal =
1126 inspect_stall_signal(&recent_output_tail, parser.buffered_output());
1127
1128 if let Some(signal_timeout) =
1129 early_signal_stall_timeout(stall_timeout, stall_signal)
1130 && idle_for >= signal_timeout
1131 {
1132 runtime_failure_kind = stall_signal.runtime_failure_kind();
1133 send_status_chunk_best_effort(
1134 &tx,
1135 stall_signal.status_message(&self.shell_session_id, command),
1136 )
1137 .await;
1138
1139 match stall_signal {
1140 StallSignal::InteractivePrompt => {
1141 tracing::info!(
1142 shell_session_id = %self.shell_session_id,
1143 process_id = %process_id,
1144 command = %command,
1145 stall_timeout_secs = signal_timeout.as_secs(),
1146 "Interrupting quiet PTY command early because recent output looks interactive"
1147 );
1148 }
1149 StallSignal::ErrorOutput => {
1150 tracing::info!(
1151 shell_session_id = %self.shell_session_id,
1152 process_id = %process_id,
1153 command = %command,
1154 stall_timeout_secs = signal_timeout.as_secs(),
1155 "Interrupting quiet PTY command early because recent output looks like an error"
1156 );
1157 }
1158 StallSignal::None => {}
1159 }
1160
1161 timed_out = true;
1162 interrupted = true;
1163 interrupt_started_at = Some(Instant::now());
1164 self.set_state(ShellSessionState::Interrupting)?;
1165 self.emit_session_lifecycle();
1166 let session = self.clone();
1167 tokio::spawn(async move {
1168 if let Err(e) = session.interrupt().await {
1169 tracing::debug!("interrupt error: {e}");
1170 }
1171 });
1172 continue;
1173 }
1174
1175 if start.elapsed() >= timeout {
1176 if idle_for < stall_timeout {
1177 let sleep_time = stall_timeout
1178 .saturating_sub(idle_for)
1179 .min(Duration::from_millis(50));
1180 tokio::time::sleep(sleep_time).await;
1181 continue;
1182 }
1183
1184 runtime_failure_kind = stall_signal.runtime_failure_kind();
1185 send_status_chunk_best_effort(
1186 &tx,
1187 stall_signal.status_message(&self.shell_session_id, command),
1188 )
1189 .await;
1190
1191 match stall_signal {
1192 StallSignal::None => {
1193 if continued_wait_cycles
1194 >= MAX_QUIET_WAIT_CYCLES_WITHOUT_SIGNAL
1195 {
1196 tracing::info!(
1197 shell_session_id = %self.shell_session_id,
1198 process_id = %process_id,
1199 command = %command,
1200 timeout_secs,
1201 stall_timeout_secs = stall_timeout.as_secs(),
1202 "Interrupting quiet PTY command after repeated quiet periods without prompt/error indicators"
1203 );
1204 } else {
1205 continued_wait_anchor_at = Some(Instant::now());
1206 continued_wait_cycles += 1;
1207 tracing::debug!(
1208 shell_session_id = %self.shell_session_id,
1209 process_id = %process_id,
1210 command = %command,
1211 timeout_secs,
1212 stall_timeout_secs = stall_timeout.as_secs(),
1213 "Long-running PTY command is quiet with no prompt/error indicator; continuing to wait"
1214 );
1215 continue;
1216 }
1217 }
1218 StallSignal::InteractivePrompt => {
1219 tracing::info!(
1220 shell_session_id = %self.shell_session_id,
1221 process_id = %process_id,
1222 command = %command,
1223 "Interrupting quiet PTY command because recent output looks interactive"
1224 );
1225 }
1226 StallSignal::ErrorOutput => {
1227 tracing::info!(
1228 shell_session_id = %self.shell_session_id,
1229 process_id = %process_id,
1230 command = %command,
1231 "Interrupting quiet PTY command because recent output looks like an error"
1232 );
1233 }
1234 }
1235
1236 timed_out = true;
1237 interrupted = true;
1238 interrupt_started_at = Some(Instant::now());
1239 self.set_state(ShellSessionState::Interrupting)?;
1240 self.emit_session_lifecycle();
1241 let session = self.clone();
1242 tokio::spawn(async move {
1243 if let Err(e) = session.interrupt().await {
1244 tracing::debug!("interrupt error: {e}");
1245 }
1246 });
1247 continue;
1248 }
1249 }
1250
1251 timed_out = true;
1252 interrupted = true;
1253 interrupt_started_at = Some(Instant::now());
1254 self.set_state(ShellSessionState::Interrupting)?;
1255 self.emit_session_lifecycle();
1256 let session = self.clone();
1257 tokio::spawn(async move {
1258 if let Err(e) = session.interrupt().await {
1259 tracing::debug!("interrupt error: {e}");
1260 }
1261 });
1262 }
1263 Err(_) => {
1264 self.set_state(ShellSessionState::Failed)?;
1265 self.emit_session_lifecycle();
1266 self.terminate().await?;
1267 let flushed = parser.finish();
1268 if !flushed.output.is_empty() {
1269 stdout.push_str(&flushed.output);
1270 let output_chunk = StreamChunk::ShellOutput {
1271 process_id: process_id.clone(),
1272 shell_session_id: Some(self.shell_session_id.clone()),
1273 stream: ShellOutputStream::Stdout,
1274 data: flushed.output,
1275 };
1276 self.emit_broadcast(output_chunk.clone());
1277 send_shell_output_chunk_best_effort(&tx, output_chunk).await;
1278 }
1279
1280 let duration_ms = start.elapsed().as_millis() as u64;
1281 return self
1282 .finish_command(
1283 &tx,
1284 command,
1285 command_cwd,
1286 CommandCompletion {
1287 process_id,
1288 stdout,
1289 exit_code: 124,
1290 process_state: ShellProcessState::Failed,
1291 duration_ms,
1292 session_state: ShellSessionState::Failed,
1293 failure_kind: runtime_failure_kind
1294 .or(Some(ShellRuntimeFailureKind::TimedOut)),
1295 },
1296 )
1297 .await;
1298 }
1299 }
1300 }
1301 }
1302
1303 async fn finish_command(
1304 &self,
1305 tx: &mpsc::Sender<StreamChunk>,
1306 command: &str,
1307 command_cwd: Option<&str>,
1308 completion: CommandCompletion,
1309 ) -> Result<StreamingCommandResult> {
1310 manager().unregister_process(&completion.process_id).await;
1311 self.set_active_sender(None)?;
1312 self.set_active_command(None, None)?;
1313 self.set_state(completion.session_state)?;
1314 if matches!(completion.session_state, ShellSessionState::Idle) && command_cwd.is_some()
1315 {
1316 self.update_working_directory(command_cwd.map(ToOwned::to_owned))?;
1317 }
1318
1319 let lifecycle_chunk = StreamChunk::ShellLifecycle {
1320 process_id: completion.process_id.clone(),
1321 shell_session_id: Some(self.shell_session_id.clone()),
1322 state: completion.process_state,
1323 exit_code: Some(completion.exit_code),
1324 duration_ms: Some(completion.duration_ms),
1325 command: command.to_string(),
1326 cwd: command_cwd.map(ToOwned::to_owned),
1327 };
1328 self.emit_broadcast(lifecycle_chunk.clone());
1329 let _ = tx.send(lifecycle_chunk).await;
1330 self.emit_session_lifecycle_to(tx).await;
1331
1332 Ok(StreamingCommandResult {
1333 process_id: completion.process_id,
1334 command: command.to_string(),
1335 stdout: completion.stdout,
1336 stderr: String::new(),
1337 exit_code: completion.exit_code,
1338 success: matches!(completion.process_state, ShellProcessState::Completed),
1339 duration_ms: completion.duration_ms,
1340 failure_kind: completion.failure_kind,
1341 })
1342 }
1343
1344 async fn request_stop(&self) -> Result<()> {
1345 if self.current_active_process_id().is_none() || self.is_closed() {
1346 return Ok(());
1347 }
1348 self.user_stop_requested.store(true, Ordering::SeqCst);
1349 self.set_state(ShellSessionState::Interrupting)?;
1350 self.emit_session_lifecycle();
1351 if self.interrupt().await.is_err() {
1352 self.set_state(ShellSessionState::Stopping)?;
1353 self.emit_session_lifecycle();
1354 return self.terminate().await;
1355 }
1356
1357 tokio::time::sleep(Duration::from_millis(STOP_ESCALATION_MILLIS)).await;
1358 if self.current_active_process_id().is_some() && !self.is_closed() {
1359 self.set_state(ShellSessionState::Stopping)?;
1360 self.emit_session_lifecycle();
1361 self.terminate().await?;
1362 }
1363 Ok(())
1364 }
1365
1366 async fn interrupt(&self) -> Result<()> {
1367 let mut writer_lock = self.writer.lock().await;
1368 let mut writer = writer_lock.take().ok_or_else(Self::closed_session_error)?;
1369 let (tx, rx) = tokio::sync::oneshot::channel();
1370 std::thread::spawn(move || {
1371 let res = writer.write_all(&[3]).and_then(|_| writer.flush());
1372 let _ = tx.send((res, writer));
1373 });
1374 let (res, writer) = tokio::time::timeout(Duration::from_secs(2), rx)
1375 .await
1376 .map_err(|_| AppError::Session("interrupt write timed out".to_string()))?
1377 .map_err(|_| AppError::Session("interrupt write thread died".to_string()))?;
1378 *writer_lock = Some(writer);
1379 res.map_err(AppError::Io)?;
1380 Ok(())
1381 }
1382
1383 async fn terminate(&self) -> Result<()> {
1384 if self.closed.swap(true, Ordering::SeqCst) {
1385 return Ok(());
1386 }
1387 self.set_active_sender(None)?;
1388
1389 #[cfg(not(windows))]
1390 {
1391 let mut writer = self.writer.lock().await;
1394 writer.take();
1395 let mut master = self.master.lock().await;
1396 master.take();
1397 }
1398
1399 let child = {
1400 let mut child_guard = self
1401 .child
1402 .lock()
1403 .map_err(|_| AppError::Session("failed to lock PTY child".to_string()))?;
1404 child_guard.take()
1405 };
1406
1407 let Some(mut child) = child else {
1408 #[cfg(windows)]
1409 {
1410 let mut writer_lock = self.writer.lock().await;
1411 let writer_to_drop = writer_lock.take();
1412 let mut master_lock = self.master.lock().await;
1413 let master_to_drop = master_lock.take();
1414 std::thread::spawn(move || {
1415 drop(writer_to_drop);
1416 drop(master_to_drop);
1417 });
1418 }
1419 return Ok(());
1420 };
1421
1422 let kill_result = {
1423 #[cfg(windows)]
1424 {
1425 if let Some(pid) = child.process_id() {
1426 let _ = tokio::task::spawn_blocking(move || {
1427 let mut cmd = std::process::Command::new("taskkill");
1428 cmd.arg("/F")
1429 .arg("/T")
1430 .arg("/PID")
1431 .arg(pid.to_string())
1432 .stdout(std::process::Stdio::null())
1433 .stderr(std::process::Stdio::null());
1434 let _ = cmd.status();
1435 })
1436 .await;
1437 }
1438 }
1439 child.kill()
1440 };
1441
1442 #[cfg(windows)]
1443 {
1444 let mut writer_lock = self.writer.lock().await;
1449 let writer_to_drop = writer_lock.take();
1450 let mut master_lock = self.master.lock().await;
1451 let master_to_drop = master_lock.take();
1452 std::thread::spawn(move || {
1453 drop(writer_to_drop);
1454 drop(master_to_drop);
1455 });
1456 }
1457
1458 let should_observe_exit = match &kill_result {
1459 Ok(()) => true,
1460 Err(error) if error.kind() == ErrorKind::NotFound => true,
1461 Err(_) => false,
1462 };
1463
1464 if should_observe_exit {
1465 for _ in 0..10 {
1466 match child.try_wait() {
1467 Ok(Some(_)) => break,
1468 Err(error) if error.kind() == ErrorKind::NotFound => break,
1469 Ok(None) => tokio::time::sleep(Duration::from_millis(50)).await,
1470 Err(error) => {
1471 tracing::debug!(
1472 shell_session_id = %self.shell_session_id,
1473 error = %error,
1474 "PTY shell try_wait after kill failed"
1475 );
1476 break;
1477 }
1478 }
1479 }
1480 }
1481
1482 match kill_result {
1483 Ok(()) => Ok(()),
1484 Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
1485 Err(error) => Err(AppError::Io(error)),
1486 }
1487 }
1488 }
1489
1490 fn prune_pool_locked(state: &mut ManagerState, pool_key: &str) {
1491 let Some(session_ids) = state.pools.get(pool_key).cloned() else {
1492 return;
1493 };
1494
1495 let mut kept = Vec::with_capacity(session_ids.len());
1496 for shell_session_id in session_ids {
1497 let keep = state
1498 .sessions
1499 .get(&shell_session_id)
1500 .is_some_and(|session| !session.is_closed());
1501 if keep {
1502 kept.push(shell_session_id);
1503 } else {
1504 state.sessions.remove(&shell_session_id);
1505 state
1506 .process_index
1507 .retain(|_, sid| sid != &shell_session_id);
1508 }
1509 }
1510
1511 if kept.is_empty() {
1512 state.pools.remove(pool_key);
1513 } else {
1514 state.pools.insert(pool_key.to_string(), kept);
1515 }
1516 }
1517
1518 fn build_shell_command(mode: SessionMode) -> CommandBuilder {
1519 match mode {
1520 SessionMode::Interactive => CommandBuilder::new_default_prog(),
1521 SessionMode::Automation => automation_shell_builder(),
1522 }
1523 }
1524
1525 fn automation_shell_builder() -> CommandBuilder {
1526 #[cfg(windows)]
1527 {
1528 let mut builder = CommandBuilder::new("cmd.exe");
1529 builder.arg("/Q");
1530 builder
1531 }
1532
1533 #[cfg(not(windows))]
1534 {
1535 if std::path::Path::new("/bin/bash").exists() {
1541 let mut builder = CommandBuilder::new("/bin/bash");
1542 builder.arg("--noprofile");
1543 builder.arg("--norc");
1544 builder
1545 } else {
1546 CommandBuilder::new("sh")
1547 }
1548 }
1549 }
1550
1551 fn prepare_shell(mode: SessionMode, writer: &mut Box<dyn Write + Send>) -> Result<()> {
1552 if matches!(mode, SessionMode::Interactive) {
1553 writer.flush().map_err(AppError::Io)?;
1554 return Ok(());
1555 }
1556
1557 #[cfg(windows)]
1558 {
1559 writer.write_all(b"prompt $G\r\n").map_err(AppError::Io)?;
1560 }
1561
1562 #[cfg(not(windows))]
1563 {
1564 writer
1565 .write_all(b"export PS1=''\nunset PROMPT_COMMAND\nstty -echo\n")
1566 .map_err(AppError::Io)?;
1567 }
1568
1569 writer.flush().map_err(AppError::Io)
1570 }
1571
1572 struct ReaderLoopContext {
1573 active_sender: Arc<StdMutex<Option<mpsc::Sender<String>>>>,
1574 active_process_id: Arc<StdMutex<Option<String>>>,
1575 closed: Arc<AtomicBool>,
1576 claimed_by_user: Arc<AtomicBool>,
1577 event_tx: broadcast::Sender<StreamChunk>,
1578 shell_session_id: String,
1579 emit_raw_output: bool,
1580 }
1581
1582 fn spawn_reader_loop(mut reader: Box<dyn Read + Send>, context: ReaderLoopContext) {
1583 let ReaderLoopContext {
1584 active_sender,
1585 active_process_id,
1586 closed,
1587 claimed_by_user,
1588 event_tx,
1589 shell_session_id,
1590 emit_raw_output,
1591 } = context;
1592
1593 tokio::task::spawn_blocking(move || {
1594 let mut buffer = [0_u8; 4096];
1595 loop {
1596 match reader.read(&mut buffer) {
1597 Ok(0) => {
1598 closed.store(true, Ordering::SeqCst);
1599 let _ = active_sender.lock().map(|mut guard| guard.take());
1600 let _ = active_process_id.lock().map(|mut guard| guard.take());
1601 return;
1602 }
1603 Ok(read) => {
1604 let chunk = String::from_utf8_lossy(&buffer[..read]).into_owned();
1605 let sender = active_sender.lock().ok().and_then(|guard| guard.clone());
1606 if let Some(sender) = sender {
1607 let _ = sender.blocking_send(chunk.clone());
1608 }
1609 let process_id = active_process_id
1610 .lock()
1611 .ok()
1612 .and_then(|guard| guard.clone());
1613 let should_emit_raw_output = emit_raw_output
1614 || (claimed_by_user.load(Ordering::SeqCst) && process_id.is_none());
1615 if should_emit_raw_output {
1616 let _ = event_tx.send(StreamChunk::ShellOutput {
1617 process_id: process_id.unwrap_or_else(|| shell_session_id.clone()),
1618 shell_session_id: Some(shell_session_id.clone()),
1619 stream: ShellOutputStream::Stdout,
1620 data: chunk,
1621 });
1622 }
1623 }
1624 Err(err) => {
1625 tracing::error!("PTY reader read failed: {:?}", err);
1626 println!("PTY reader read failed: {:?}", err);
1627 closed.store(true, Ordering::SeqCst);
1628 let _ = active_sender.lock().map(|mut guard| guard.take());
1629 let _ = active_process_id.lock().map(|mut guard| guard.take());
1630 return;
1631 }
1632 }
1633 }
1634 });
1635 }
1636
1637 fn spawn_child_monitor(
1638 child: Arc<StdMutex<Option<Box<dyn Child + Send + Sync>>>>,
1639 closed: Arc<AtomicBool>,
1640 shell_session_id: String,
1641 ) {
1642 tokio::spawn(async move {
1643 loop {
1644 if closed.load(Ordering::SeqCst) {
1645 return;
1646 }
1647
1648 let exit_result = {
1649 let mut child = match child.lock() {
1650 Ok(child) => child,
1651 Err(_) => {
1652 closed.store(true, Ordering::SeqCst);
1653 return;
1654 }
1655 };
1656
1657 let poll_result = match child.as_mut() {
1658 Some(child) => child.try_wait().map(|status| status.is_some()),
1659 None => return,
1660 };
1661
1662 match poll_result {
1663 Ok(true) => {
1664 child.take();
1665 Some(Ok(()))
1666 }
1667 Ok(false) => None,
1668 Err(error) => {
1669 child.take();
1670 Some(Err(error))
1671 }
1672 }
1673 };
1674
1675 match exit_result {
1676 Some(Ok(())) => {
1677 closed.store(true, Ordering::SeqCst);
1678 return;
1679 }
1680 Some(Err(error)) => {
1681 tracing::warn!(
1682 shell_session_id = %shell_session_id,
1683 error = %error,
1684 "PTY shell exit poll failed"
1685 );
1686 closed.store(true, Ordering::SeqCst);
1687 return;
1688 }
1689 None => tokio::time::sleep(CHILD_EXIT_POLL_INTERVAL).await,
1690 }
1691 }
1692 });
1693 }
1694
1695 impl SessionOutputParser {
1696 fn new(start_marker: String, done_prefix: String) -> Self {
1697 Self {
1698 pending: String::new(),
1699 started: false,
1700 start_marker,
1701 done_prefix,
1702 }
1703 }
1704
1705 fn push(&mut self, chunk: &str) -> ParsedChunk {
1706 self.pending.push_str(chunk);
1707
1708 if !self.started {
1709 if let Some(consume_end) = find_start_marker(&self.pending, &self.start_marker) {
1710 let rest = self.pending[consume_end..].to_string();
1711 self.pending = rest;
1712 self.started = true;
1713 } else {
1714 trim_to_tail(&mut self.pending, self.start_marker.len() + 8);
1715 return ParsedChunk {
1716 output: String::new(),
1717 exit_code: None,
1718 };
1719 }
1720 }
1721
1722 if let Some((output_end, consume_end, exit_code)) =
1723 find_done_marker(&self.pending, &self.done_prefix)
1724 {
1725 let output = self.pending[..output_end].to_string();
1726 self.pending = self.pending[consume_end..].to_string();
1727 return ParsedChunk {
1728 output,
1729 exit_code: Some(exit_code),
1730 };
1731 }
1732
1733 let keep = self.done_prefix.len() + 32;
1734 if self.pending.len() > keep {
1735 let flush_len = floor_char_boundary(&self.pending, self.pending.len() - keep);
1736 let output = self.pending[..flush_len].to_string();
1737 self.pending = self.pending[flush_len..].to_string();
1738 ParsedChunk {
1739 output,
1740 exit_code: None,
1741 }
1742 } else {
1743 ParsedChunk {
1744 output: String::new(),
1745 exit_code: None,
1746 }
1747 }
1748 }
1749
1750 fn finish(&mut self) -> ParsedChunk {
1751 if !self.started {
1752 self.pending.clear();
1753 return ParsedChunk {
1754 output: String::new(),
1755 exit_code: None,
1756 };
1757 }
1758
1759 if let Some((output_end, _, exit_code)) =
1760 find_done_marker(&self.pending, &self.done_prefix)
1761 .or_else(|| find_done_marker_allow_eof(&self.pending, &self.done_prefix))
1762 {
1763 let output = self.pending[..output_end].to_string();
1764 self.pending.clear();
1765 return ParsedChunk {
1766 output,
1767 exit_code: Some(exit_code),
1768 };
1769 }
1770
1771 ParsedChunk {
1772 output: std::mem::take(&mut self.pending),
1773 exit_code: None,
1774 }
1775 }
1776
1777 fn buffered_output(&self) -> &str {
1778 if self.started {
1779 self.pending.as_str()
1780 } else {
1781 ""
1782 }
1783 }
1784 }
1785
1786 fn append_recent_output_tail(buffer: &mut String, chunk: &str) {
1787 if chunk.is_empty() {
1788 return;
1789 }
1790 buffer.push_str(chunk);
1791 trim_to_tail(buffer, STALL_SIGNAL_TAIL_BYTES);
1792 }
1793
1794 fn inspect_stall_signal(flushed_output_tail: &str, buffered_output_tail: &str) -> StallSignal {
1795 let mut combined = flushed_output_tail.to_string();
1796 append_recent_output_tail(&mut combined, buffered_output_tail);
1797 if combined.trim().is_empty() {
1798 return StallSignal::None;
1799 }
1800
1801 let normalized = combined.to_ascii_lowercase();
1802 if INTERACTIVE_PROMPT_PATTERNS
1803 .iter()
1804 .any(|needle| normalized.contains(needle))
1805 {
1806 return StallSignal::InteractivePrompt;
1807 }
1808
1809 if normalized
1810 .lines()
1811 .map(str::trim)
1812 .any(|line| line.starts_with("error:"))
1813 || ERROR_OUTPUT_PATTERNS
1814 .iter()
1815 .any(|needle| normalized.contains(needle))
1816 {
1817 return StallSignal::ErrorOutput;
1818 }
1819
1820 StallSignal::None
1821 }
1822
1823 fn find_done_marker(buffer: &str, done_prefix: &str) -> Option<(usize, usize, i32)> {
1824 let marker = format!("\n{done_prefix}");
1825 let idx = buffer.find(&marker)?;
1826 let code_start = idx + marker.len();
1827 let line_end_rel = buffer[code_start..].find('\n')?;
1828 let consume_end = code_start + line_end_rel + 1;
1829 let exit_code = buffer[code_start..code_start + line_end_rel]
1830 .trim_end_matches('\r')
1831 .parse()
1832 .ok()?;
1833 let output_end = if idx > 0 && buffer.as_bytes()[idx - 1] == b'\r' {
1834 idx - 1
1835 } else {
1836 idx
1837 };
1838 Some((output_end, consume_end, exit_code))
1839 }
1840
1841 fn find_done_marker_allow_eof(buffer: &str, done_prefix: &str) -> Option<(usize, usize, i32)> {
1842 let marker = format!("\n{done_prefix}");
1843 let idx = buffer.find(&marker)?;
1844 let code_start = idx + marker.len();
1845 let exit_code = buffer[code_start..].trim_end_matches('\r').parse().ok()?;
1846 let output_end = if idx > 0 && buffer.as_bytes()[idx - 1] == b'\r' {
1847 idx - 1
1848 } else {
1849 idx
1850 };
1851 Some((output_end, buffer.len(), exit_code))
1852 }
1853
1854 fn find_start_marker(buffer: &str, start_marker: &str) -> Option<usize> {
1855 let mut search_from = 0;
1856 while let Some(rel_idx) = buffer[search_from..].find(start_marker) {
1857 let idx = search_from + rel_idx;
1858 let before_ok = idx == 0 || buffer.as_bytes()[idx - 1] == b'\n';
1859 if !before_ok {
1860 search_from = idx + start_marker.len();
1861 continue;
1862 }
1863
1864 let after = idx + start_marker.len();
1865 let consumed = if buffer[after..].starts_with("\r\n") {
1866 after + 2
1867 } else if buffer[after..].starts_with('\n') || buffer[after..].starts_with('\r') {
1868 after + 1
1869 } else {
1870 search_from = idx + start_marker.len();
1871 continue;
1872 };
1873
1874 return Some(consumed);
1875 }
1876
1877 None
1878 }
1879
1880 fn trim_to_tail(value: &mut String, keep: usize) {
1881 if value.len() <= keep {
1882 return;
1883 }
1884 let start = ceil_char_boundary(value, value.len() - keep);
1885 *value = value[start..].to_string();
1886 }
1887
1888 fn floor_char_boundary(value: &str, index: usize) -> usize {
1889 let mut index = index.min(value.len());
1890 while index > 0 && !value.is_char_boundary(index) {
1891 index -= 1;
1892 }
1893 index
1894 }
1895
1896 fn ceil_char_boundary(value: &str, index: usize) -> usize {
1897 let mut index = index.min(value.len());
1898 while index < value.len() && !value.is_char_boundary(index) {
1899 index += 1;
1900 }
1901 index
1902 }
1903
1904 fn wrap_command(
1905 command: &str,
1906 command_cwd: Option<&str>,
1907 start_marker: &str,
1908 done_prefix: &str,
1909 ) -> String {
1910 #[cfg(windows)]
1911 {
1912 wrap_command_windows(command, command_cwd, start_marker, done_prefix)
1913 }
1914
1915 #[cfg(not(windows))]
1916 {
1917 wrap_command_posix(command, command_cwd, start_marker, done_prefix)
1918 }
1919 }
1920
1921 #[cfg(not(windows))]
1922 fn wrap_command_posix(
1923 command: &str,
1924 command_cwd: Option<&str>,
1925 start_marker: &str,
1926 done_prefix: &str,
1927 ) -> String {
1928 let mut script = format!("printf '{start_marker}\\n'; _GESTURA_STATUS=0; ");
1929 if let Some(cwd) = command_cwd {
1930 script.push_str("cd ");
1931 script.push_str("e_posix(cwd));
1932 script.push_str(" || _GESTURA_STATUS=$?; ");
1933 }
1934 script.push_str("if [ \"$_GESTURA_STATUS\" -eq 0 ]; then eval ");
1935 script.push_str("e_posix(command));
1936 script.push_str("; _GESTURA_STATUS=$?; fi; ");
1937 script.push_str(&format!(
1938 "printf '\\n{done_prefix}%s\\n' \"$_GESTURA_STATUS\"; unset _GESTURA_STATUS\n"
1939 ));
1940 script
1941 }
1942
1943 #[cfg(windows)]
1944 fn wrap_command_windows(
1945 command: &str,
1946 command_cwd: Option<&str>,
1947 start_marker: &str,
1948 done_prefix: &str,
1949 ) -> String {
1950 let mut script = format!("echo {start_marker}\r\nset GESTURA_STATUS=0\r\n");
1951 if let Some(cwd) = command_cwd {
1952 script.push_str("cd /d ");
1953 script.push_str("e_cmd_arg(cwd));
1954 script.push_str(" || set GESTURA_STATUS=%ERRORLEVEL%\r\n");
1955 }
1956 script.push_str("if \"%GESTURA_STATUS%\"==\"0\" ");
1957 script.push_str(command);
1958 script.push_str("\r\n");
1959 script.push_str("if \"%GESTURA_STATUS%\"==\"0\" set GESTURA_STATUS=%ERRORLEVEL%\r\n");
1960 script.push_str(&format!(
1961 "echo {done_prefix}%GESTURA_STATUS%\r\nset GESTURA_STATUS=\r\n"
1962 ));
1963 script
1964 }
1965
1966 #[cfg(not(windows))]
1967 fn quote_posix(value: &str) -> String {
1968 format!("'{}'", value.replace('\'', "'\\''"))
1969 }
1970
1971 #[cfg(windows)]
1972 fn quote_cmd_arg(value: &str) -> String {
1973 format!("\"{}\"", value.replace('"', "\"\""))
1974 }
1975
1976 #[cfg(test)]
1977 mod tests {
1978 use super::*;
1979
1980 #[cfg(not(target_os = "windows"))]
1981 lazy_static::lazy_static! {
1982 static ref PTY_TEST_SEMAPHORE: tokio::sync::Semaphore = tokio::sync::Semaphore::new(1);
1983 }
1984
1985 #[cfg(not(target_os = "windows"))]
1986 const PTY_TEST_TIMEOUT_SECS: u64 = 30;
1987 #[cfg(not(target_os = "windows"))]
1988 const PTY_EVENT_TIMEOUT_SECS: u64 = 20;
1989
1990 #[cfg(not(target_os = "windows"))]
1991 async fn shutdown_session_for_test(pool_key: &str) {
1992 let _ = tokio::time::timeout(Duration::from_secs(2), shutdown_session(pool_key)).await;
1993 }
1994
1995 #[cfg(not(target_os = "windows"))]
1996 async fn run_pty_test<F>(pool_key: &'static str, future: F)
1997 where
1998 F: std::future::Future<Output = ()> + Send + 'static,
1999 {
2000 let _permit = PTY_TEST_SEMAPHORE.acquire().await.unwrap();
2001 shutdown_session_for_test(pool_key).await;
2002 let mut handle = tokio::spawn(future);
2003 tokio::select! {
2004 result = &mut handle => {
2005 shutdown_session_for_test(pool_key).await;
2006 match result {
2007 Ok(()) => {}
2008 Err(error) if error.is_panic() => std::panic::resume_unwind(error.into_panic()),
2009 Err(error) => panic!("PTY test task for pool '{pool_key}' failed to join: {error}"),
2010 }
2011 }
2012 _ = tokio::time::sleep(Duration::from_secs(PTY_TEST_TIMEOUT_SECS)) => {
2013 handle.abort();
2014 let _ = handle.await;
2015 shutdown_session_for_test(pool_key).await;
2016 panic!(
2017 "PTY test timed out after {}s for pool '{pool_key}'",
2018 PTY_TEST_TIMEOUT_SECS
2019 );
2020 }
2021 }
2022 }
2023
2024 #[cfg(not(target_os = "windows"))]
2025 async fn recv_session_lifecycle(
2026 rx: &mut mpsc::Receiver<StreamChunk>,
2027 ) -> (String, ShellSessionState, bool, bool) {
2028 loop {
2029 if let StreamChunk::ShellSessionLifecycle {
2030 shell_session_id,
2031 state,
2032 interactive,
2033 user_managed,
2034 ..
2035 } = tokio::time::timeout(Duration::from_secs(PTY_EVENT_TIMEOUT_SECS), rx.recv())
2036 .await
2037 .expect("timed out waiting for shell event")
2038 .expect("channel closed while waiting for shell event")
2039 {
2040 return (shell_session_id, state, interactive, user_managed);
2041 }
2042 }
2043 }
2044
2045 #[cfg(not(target_os = "windows"))]
2046 async fn recv_command_started(rx: &mut mpsc::Receiver<StreamChunk>) -> (String, String) {
2047 loop {
2048 if let StreamChunk::ShellLifecycle {
2049 process_id,
2050 shell_session_id,
2051 state: ShellProcessState::Started,
2052 ..
2053 } = tokio::time::timeout(Duration::from_secs(PTY_EVENT_TIMEOUT_SECS), rx.recv())
2054 .await
2055 .expect("timed out waiting for command start")
2056 .expect("channel closed while waiting for command start")
2057 {
2058 return (
2059 process_id,
2060 shell_session_id.expect("PTY-managed commands should carry session id"),
2061 );
2062 }
2063 }
2064 }
2065
2066 #[cfg(not(target_os = "windows"))]
2067 fn spawn_chunk_collector(
2068 mut rx: mpsc::Receiver<StreamChunk>,
2069 ) -> tokio::task::JoinHandle<Vec<StreamChunk>> {
2070 tokio::spawn(async move {
2071 let mut chunks = Vec::new();
2072 while let Some(chunk) = rx.recv().await {
2073 chunks.push(chunk);
2074 }
2075 chunks
2076 })
2077 }
2078
2079 #[cfg(not(target_os = "windows"))]
2080 fn simple_output_command(text: &str) -> String {
2081 format!("printf {text}")
2082 }
2083
2084 #[cfg(not(target_os = "windows"))]
2085 fn delayed_output_command(text: &str) -> String {
2086 format!("sleep 2; printf {text}")
2087 }
2088
2089 #[cfg(not(target_os = "windows"))]
2090 fn interactive_input_command(text: &str) -> String {
2091 format!("printf {text}\n")
2092 }
2093
2094 #[test]
2095 fn parser_extracts_output_and_exit_code() {
2096 let mut parser = SessionOutputParser::new(
2097 "__GESTURA_START_abc__".to_string(),
2098 "__GESTURA_DONE_abc__:".to_string(),
2099 );
2100
2101 let first =
2102 parser.push("printf '__GESTURA_START_abc__\\n'\r\n__GESTURA_START_abc__\r\nhello");
2103 assert_eq!(first.output, "");
2104 assert_eq!(first.exit_code, None);
2105
2106 let second = parser.push(" world\n__GESTURA_DONE_abc__:0\r\n");
2107 assert_eq!(second.output, "hello world");
2108 assert_eq!(second.exit_code, Some(0));
2109 }
2110
2111 #[test]
2112 fn parser_finish_extracts_trailing_done_marker_after_channel_close() {
2113 let mut parser = SessionOutputParser::new(
2114 "__GESTURA_START_abc__".to_string(),
2115 "__GESTURA_DONE_abc__:".to_string(),
2116 );
2117
2118 let first = parser.push(
2119 "printf '__GESTURA_START_abc__\\n'\r\n__GESTURA_START_abc__\r\nhello world\n__GESTURA_DONE_abc__:0\r",
2120 );
2121 assert_eq!(first.output, "");
2122 assert_eq!(first.exit_code, None);
2123
2124 let finished = parser.finish();
2125 assert_eq!(finished.output, "hello world");
2126 assert_eq!(finished.exit_code, Some(0));
2127 }
2128
2129 #[test]
2130 #[cfg(not(windows))]
2131 fn wrap_command_changes_directory_before_execution() {
2132 let script = wrap_command(
2133 "pwd",
2134 Some("/tmp/example dir"),
2135 "__GESTURA_START__",
2136 "__GESTURA_DONE__:",
2137 );
2138
2139 assert!(script.contains("cd '/tmp/example dir'"));
2140 assert!(script.contains("printf '__GESTURA_START__\\n'"));
2141 assert!(script.contains("printf '\\n__GESTURA_DONE__:%s\\n'"));
2142 }
2143
2144 #[cfg(not(target_os = "windows"))]
2145 #[tokio::test]
2146 async fn create_session_starts_idle_shell_without_command() {
2147 run_pty_test("pty-create-session", async move {
2148 let (tx, mut rx) = mpsc::channel(64);
2149 let handle = create_session(
2150 "pty-create-session",
2151 std::env::current_dir()
2152 .ok()
2153 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2154 .as_deref(),
2155 Some(tx),
2156 )
2157 .await
2158 .expect("create PTY session");
2159
2160 let (shell_session_id, state, interactive, user_managed) =
2161 recv_session_lifecycle(&mut rx).await;
2162 assert_eq!(shell_session_id, handle.shell_session_id);
2163 assert_eq!(state, ShellSessionState::Idle);
2164 assert!(interactive);
2165 assert!(user_managed);
2166
2167 stop_session(&handle.shell_session_id)
2168 .await
2169 .expect("stop PTY session");
2170 })
2171 .await;
2172 }
2173
2174 #[cfg(not(target_os = "windows"))]
2175 #[tokio::test]
2176 async fn reuses_idle_session_within_same_pool() {
2177 run_pty_test("pty-reuse-pool", async move {
2178 let (tx, rx) = mpsc::channel(128);
2179 let collector = spawn_chunk_collector(rx);
2180 let first_command = simple_output_command("first");
2181 let first = execute_in_session(
2182 "pty-reuse-pool",
2183 std::env::current_dir()
2184 .ok()
2185 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2186 .as_deref(),
2187 first_command.as_str(),
2188 None,
2189 Some(10),
2190 tx.clone(),
2191 )
2192 .await
2193 .expect("first command result");
2194 assert!(first.stdout.contains("first"));
2195
2196 let second_command = simple_output_command("second");
2197 let second = execute_in_session(
2198 "pty-reuse-pool",
2199 std::env::current_dir()
2200 .ok()
2201 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2202 .as_deref(),
2203 second_command.as_str(),
2204 None,
2205 Some(10),
2206 tx.clone(),
2207 )
2208 .await
2209 .expect("second command result");
2210 assert!(second.stdout.contains("second"));
2211
2212 shutdown_session("pty-reuse-pool")
2213 .await
2214 .expect("shutdown PTY session pool");
2215
2216 drop(tx);
2217 let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2218 .await
2219 .expect("timed out collecting reuse-pool shell chunks")
2220 .expect("reuse-pool collector should join");
2221
2222 let started_session_ids = chunks
2223 .iter()
2224 .filter_map(|chunk| match chunk {
2225 StreamChunk::ShellLifecycle {
2226 shell_session_id: Some(shell_session_id),
2227 state: ShellProcessState::Started,
2228 ..
2229 } => Some(shell_session_id.clone()),
2230 _ => None,
2231 })
2232 .collect::<Vec<_>>();
2233
2234 assert!(
2235 started_session_ids.len() >= 2,
2236 "expected at least two started shell lifecycle events, got {chunks:?}"
2237 );
2238 assert_eq!(started_session_ids[0], started_session_ids[1]);
2239 })
2240 .await;
2241 }
2242
2243 #[cfg(not(target_os = "windows"))]
2244 #[tokio::test]
2245 async fn reused_session_emits_busy_before_started_for_follow_up_command() {
2246 run_pty_test("pty-reuse-order-pool", async move {
2247 let (tx, rx) = mpsc::channel(128);
2248 let collector = spawn_chunk_collector(rx);
2249
2250 let first_command = simple_output_command("first");
2251 execute_in_session(
2252 "pty-reuse-order-pool",
2253 std::env::current_dir()
2254 .ok()
2255 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2256 .as_deref(),
2257 first_command.as_str(),
2258 None,
2259 Some(10),
2260 tx.clone(),
2261 )
2262 .await
2263 .expect("first command result");
2264
2265 let second_command = simple_output_command("second");
2266 execute_in_session(
2267 "pty-reuse-order-pool",
2268 std::env::current_dir()
2269 .ok()
2270 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2271 .as_deref(),
2272 second_command.as_str(),
2273 None,
2274 Some(10),
2275 tx.clone(),
2276 )
2277 .await
2278 .expect("second command result");
2279
2280 shutdown_session("pty-reuse-order-pool")
2281 .await
2282 .expect("shutdown PTY reuse-order session pool");
2283
2284 drop(tx);
2285 let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2286 .await
2287 .expect("timed out collecting reuse-order shell chunks")
2288 .expect("reuse-order collector should join");
2289
2290 let reused_session_id = chunks
2291 .iter()
2292 .find_map(|chunk| match chunk {
2293 StreamChunk::ShellLifecycle {
2294 shell_session_id: Some(shell_session_id),
2295 state: ShellProcessState::Started,
2296 ..
2297 } => Some(shell_session_id.clone()),
2298 _ => None,
2299 })
2300 .expect("expected started shell lifecycle event for reused session");
2301
2302 let busy_indices = chunks
2303 .iter()
2304 .enumerate()
2305 .filter_map(|(index, chunk)| match chunk {
2306 StreamChunk::ShellSessionLifecycle {
2307 shell_session_id,
2308 state: ShellSessionState::Busy,
2309 interactive: false,
2310 user_managed: false,
2311 ..
2312 } if shell_session_id == &reused_session_id => Some(index),
2313 _ => None,
2314 })
2315 .collect::<Vec<_>>();
2316
2317 let started_indices = chunks
2318 .iter()
2319 .enumerate()
2320 .filter_map(|(index, chunk)| match chunk {
2321 StreamChunk::ShellLifecycle {
2322 shell_session_id: Some(shell_session_id),
2323 state: ShellProcessState::Started,
2324 ..
2325 } if shell_session_id == &reused_session_id => Some(index),
2326 _ => None,
2327 })
2328 .collect::<Vec<_>>();
2329
2330 assert!(
2331 busy_indices.len() >= 2,
2332 "expected two busy shell session lifecycle events for reused session, got {chunks:?}"
2333 );
2334 assert!(
2335 started_indices.len() >= 2,
2336 "expected two started shell lifecycle events for reused session, got {chunks:?}"
2337 );
2338 assert!(busy_indices[1] < started_indices[1]);
2339 })
2340 .await;
2341 }
2342
2343 #[cfg(not(target_os = "windows"))]
2344 #[tokio::test]
2345 async fn execute_in_session_emits_session_lifecycle_to_caller_stream() {
2346 run_pty_test("pty-session-lifecycle-stream", async move {
2347 let (tx, rx) = mpsc::channel(128);
2348 let collector = spawn_chunk_collector(rx);
2349 let command = simple_output_command("streamed");
2350 let result = execute_in_session(
2351 "pty-session-lifecycle-stream",
2352 std::env::current_dir()
2353 .ok()
2354 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2355 .as_deref(),
2356 command.as_str(),
2357 None,
2358 Some(10),
2359 tx.clone(),
2360 )
2361 .await
2362 .expect("command result");
2363 assert!(result.stdout.contains("streamed"));
2364
2365 shutdown_session("pty-session-lifecycle-stream")
2366 .await
2367 .expect("shutdown PTY session pool");
2368
2369 drop(tx);
2370 let chunks = tokio::time::timeout(Duration::from_secs(5), collector)
2371 .await
2372 .expect("timed out collecting lifecycle stream chunks")
2373 .expect("lifecycle collector should join");
2374
2375 let (busy_index, busy_session_id) = chunks
2376 .iter()
2377 .enumerate()
2378 .find_map(|(index, chunk)| match chunk {
2379 StreamChunk::ShellSessionLifecycle {
2380 shell_session_id,
2381 state: ShellSessionState::Busy,
2382 interactive: false,
2383 user_managed: false,
2384 ..
2385 } => Some((index, shell_session_id.clone())),
2386 _ => None,
2387 })
2388 .expect("expected busy shell session lifecycle event");
2389
2390 let started_index = chunks
2391 .iter()
2392 .enumerate()
2393 .find_map(|(index, chunk)| match chunk {
2394 StreamChunk::ShellLifecycle {
2395 shell_session_id: Some(shell_session_id),
2396 state: ShellProcessState::Started,
2397 ..
2398 } if shell_session_id == &busy_session_id => Some(index),
2399 _ => None,
2400 })
2401 .expect("expected started shell lifecycle event for busy session");
2402
2403 let idle_index = chunks
2404 .iter()
2405 .enumerate()
2406 .find_map(|(index, chunk)| match chunk {
2407 StreamChunk::ShellSessionLifecycle {
2408 shell_session_id,
2409 state: ShellSessionState::Idle,
2410 interactive: false,
2411 user_managed: false,
2412 ..
2413 } if shell_session_id == &busy_session_id => Some(index),
2414 _ => None,
2415 })
2416 .expect("expected idle shell session lifecycle event for busy session");
2417
2418 assert!(busy_index < started_index);
2419 assert!(started_index < idle_index);
2420 })
2421 .await;
2422 }
2423
2424 #[cfg(not(target_os = "windows"))]
2425 #[tokio::test]
2426 async fn allocates_new_session_when_existing_one_is_busy() {
2427 run_pty_test("pty-busy-pool", async move {
2428 let (tx_one, mut rx_one) = mpsc::channel(128);
2429 let (tx_two, mut rx_two) = mpsc::channel(128);
2430 let first_command = delayed_output_command("one");
2431
2432 let first = tokio::spawn(async move {
2433 execute_in_session(
2434 "pty-busy-pool",
2435 std::env::current_dir()
2436 .ok()
2437 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2438 .as_deref(),
2439 first_command.as_str(),
2440 None,
2441 Some(10),
2442 tx_one,
2443 )
2444 .await
2445 });
2446
2447 let (_, first_session_id) = recv_command_started(&mut rx_one).await;
2448 let second_command = simple_output_command("two");
2449
2450 let second = tokio::spawn(async move {
2451 execute_in_session(
2452 "pty-busy-pool",
2453 std::env::current_dir()
2454 .ok()
2455 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2456 .as_deref(),
2457 second_command.as_str(),
2458 None,
2459 Some(10),
2460 tx_two,
2461 )
2462 .await
2463 });
2464
2465 let (_, second_session_id) = recv_command_started(&mut rx_two).await;
2466 assert_ne!(first_session_id, second_session_id);
2467
2468 first.await.expect("first join").expect("first result");
2469 second.await.expect("second join").expect("second result");
2470
2471 shutdown_session("pty-busy-pool")
2472 .await
2473 .expect("shutdown PTY session pool");
2474 })
2475 .await;
2476 }
2477
2478 #[cfg(not(target_os = "windows"))]
2479 #[tokio::test]
2480 async fn activity_aware_execution_allows_recently_active_long_running_commands() {
2481 run_pty_test("pty-activity-aware-pool", async move {
2482 let (tx, _rx) = mpsc::channel(128);
2483 let command =
2484 "printf warmup && sleep 2 && printf progress && sleep 2 && printf done";
2485
2486 let result = execute_in_session_with_options(
2487 "pty-activity-aware-pool",
2488 std::env::current_dir()
2489 .ok()
2490 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2491 .as_deref(),
2492 command,
2493 None,
2494 ShellExecutionOptions {
2495 timeout_secs: Some(1),
2496 allow_long_running: true,
2497 stall_timeout_secs: Some(8),
2498 },
2499 tx,
2500 )
2501 .await
2502 .expect("activity-aware PTY command result");
2503
2504 assert!(result.success);
2505 assert!(result.stdout.contains("done"));
2506
2507 shutdown_session("pty-activity-aware-pool")
2508 .await
2509 .expect("shutdown activity-aware PTY session pool");
2510 })
2511 .await;
2512 }
2513
2514 #[cfg(not(target_os = "windows"))]
2515 #[tokio::test]
2516 async fn activity_aware_execution_reports_continue_wait_before_timing_out_when_quiet_output_has_no_indicator()
2517 {
2518 run_pty_test("pty-quiet-activity-aware-pool", async move {
2519 let (tx, mut rx) = mpsc::channel(128);
2520 let command = "sleep 4 && printf done";
2521
2522 let result = execute_in_session_with_options(
2523 "pty-quiet-activity-aware-pool",
2524 std::env::current_dir()
2525 .ok()
2526 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2527 .as_deref(),
2528 command,
2529 None,
2530 ShellExecutionOptions {
2531 timeout_secs: Some(1),
2532 allow_long_running: true,
2533 stall_timeout_secs: Some(1),
2534 },
2535 tx,
2536 )
2537 .await
2538 .expect("quiet activity-aware PTY command result");
2539
2540 shutdown_session_for_test("pty-quiet-activity-aware-pool").await;
2541
2542 assert!(!result.success);
2543 assert_eq!(result.exit_code, 124);
2544 assert_eq!(result.failure_kind, Some(ShellRuntimeFailureKind::TimedOut));
2545
2546 let mut saw_continue_wait_status = false;
2547 while let Ok(chunk) = rx.try_recv() {
2548 if let StreamChunk::Status { message } = chunk
2549 && message.contains("saw no prompt or error indicator")
2550 && message.contains("will continue waiting")
2551 {
2552 saw_continue_wait_status = true;
2553 break;
2554 }
2555 }
2556 assert!(saw_continue_wait_status);
2557 })
2558 .await;
2559 }
2560
2561 #[cfg(not(target_os = "windows"))]
2562 #[tokio::test]
2563 async fn activity_aware_execution_times_out_after_repeated_quiet_periods_without_signal() {
2564 run_pty_test("pty-quiet-timeout-pool", async move {
2565 let (tx, _rx) = mpsc::channel(128);
2566 let command = "sleep 4 && printf done";
2567
2568 let result = tokio::time::timeout(
2569 Duration::from_secs(8),
2570 execute_in_session_with_options(
2571 "pty-quiet-timeout-pool",
2572 std::env::current_dir()
2573 .ok()
2574 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2575 .as_deref(),
2576 command,
2577 None,
2578 ShellExecutionOptions {
2579 timeout_secs: Some(1),
2580 allow_long_running: true,
2581 stall_timeout_secs: Some(1),
2582 },
2583 tx,
2584 ),
2585 )
2586 .await
2587 .expect("quiet PTY command should not hang")
2588 .expect("quiet PTY timeout result");
2589
2590 shutdown_session_for_test("pty-quiet-timeout-pool").await;
2591
2592 assert!(!result.success);
2593 assert_eq!(result.exit_code, 124);
2594 assert_eq!(result.failure_kind, Some(ShellRuntimeFailureKind::TimedOut));
2595 })
2596 .await;
2597 }
2598
2599 #[test]
2600 fn error_output_stalls_use_the_shorter_early_signal_timeout() {
2601 assert_eq!(
2602 early_signal_stall_timeout(Duration::from_secs(30), StallSignal::ErrorOutput),
2603 Some(Duration::from_secs(EARLY_SIGNAL_STALL_TIMEOUT_SECS))
2604 );
2605 assert_eq!(
2606 early_signal_stall_timeout(Duration::from_secs(5), StallSignal::ErrorOutput),
2607 Some(Duration::from_secs(5))
2608 );
2609 }
2610
2611 #[test]
2612 fn generic_quiet_stalls_do_not_use_the_early_signal_timeout() {
2613 assert_eq!(
2614 early_signal_stall_timeout(Duration::from_secs(30), StallSignal::None),
2615 None
2616 );
2617 }
2618
2619 #[test]
2620 fn stall_signal_maps_interactive_prompt_to_waiting_for_input_failure_kind() {
2621 assert_eq!(
2622 StallSignal::InteractivePrompt.runtime_failure_kind(),
2623 Some(ShellRuntimeFailureKind::WaitingForInput)
2624 );
2625 assert!(
2626 StallSignal::InteractivePrompt
2627 .status_message("shell-123", "pnpm add vite")
2628 .contains("waiting_for_input")
2629 );
2630 }
2631
2632 #[test]
2633 fn stall_signal_detects_interactive_prompt_in_buffered_output() {
2634 assert_eq!(
2635 inspect_stall_signal(
2636 "",
2637 "Need to install the following packages:\ncreate-app\nProceed? (y/n)"
2638 ),
2639 StallSignal::InteractivePrompt
2640 );
2641 }
2642
2643 #[test]
2644 fn stall_signal_detects_recent_error_output() {
2645 assert_eq!(
2646 inspect_stall_signal(
2647 "Compiling dependencies\n",
2648 "error: no such file or directory"
2649 ),
2650 StallSignal::ErrorOutput
2651 );
2652 }
2653
2654 #[test]
2655 fn stall_signal_returns_none_when_output_has_no_clear_indicator() {
2656 assert_eq!(
2657 inspect_stall_signal("Compiling 24 crates\n", "Still working on build graph"),
2658 StallSignal::None
2659 );
2660 }
2661
2662 #[cfg(not(target_os = "windows"))]
2663 #[tokio::test]
2664 async fn claiming_automation_session_removes_it_from_reuse_pool() {
2665 run_pty_test("pty-claim-pool", async move {
2666 let (tx, mut rx) = mpsc::channel(128);
2667 let first_command = simple_output_command("first");
2668 let first = execute_in_session(
2669 "pty-claim-pool",
2670 std::env::current_dir()
2671 .ok()
2672 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2673 .as_deref(),
2674 first_command.as_str(),
2675 None,
2676 Some(10),
2677 tx.clone(),
2678 )
2679 .await
2680 .expect("first command result");
2681 assert!(first.stdout.contains("first"));
2682
2683 let (_, first_session_id) = recv_command_started(&mut rx).await;
2684
2685 claim_session(&first_session_id)
2686 .await
2687 .expect("claim session")
2688 .expect("claimed metadata");
2689
2690 let metadata = describe_session(&first_session_id)
2691 .await
2692 .expect("describe claimed session")
2693 .expect("session metadata");
2694 assert!(metadata.user_managed);
2695 assert!(!metadata.available_for_reuse);
2696
2697 let second_command = simple_output_command("second");
2698 let second = execute_in_session(
2699 "pty-claim-pool",
2700 std::env::current_dir()
2701 .ok()
2702 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2703 .as_deref(),
2704 second_command.as_str(),
2705 None,
2706 Some(10),
2707 tx.clone(),
2708 )
2709 .await
2710 .expect("second command result");
2711 assert!(second.stdout.contains("second"));
2712
2713 let (_, second_session_id) = recv_command_started(&mut rx).await;
2714 assert_ne!(first_session_id, second_session_id);
2715
2716 shutdown_session("pty-claim-pool")
2717 .await
2718 .expect("shutdown PTY session pool");
2719 })
2720 .await;
2721 }
2722
2723 #[cfg(not(target_os = "windows"))]
2724 #[tokio::test]
2725 async fn claimed_automation_session_streams_interactive_output_after_attach() {
2726 run_pty_test("pty-attach-pool", async move {
2727 let (tx, mut rx) = mpsc::channel(128);
2728 let seed_command = simple_output_command("base");
2729 execute_in_session(
2730 "pty-attach-pool",
2731 std::env::current_dir()
2732 .ok()
2733 .and_then(|p| p.to_str().map(ToOwned::to_owned))
2734 .as_deref(),
2735 seed_command.as_str(),
2736 None,
2737 Some(10),
2738 tx,
2739 )
2740 .await
2741 .expect("seed command result");
2742
2743 let (_, shell_session_id) = recv_command_started(&mut rx).await;
2744
2745 let (attach_tx, mut attach_rx) = mpsc::channel(128);
2746 subscribe_session(&shell_session_id, attach_tx)
2747 .await
2748 .expect("subscribe to shell session")
2749 .expect("session metadata after subscribe");
2750
2751 claim_session(&shell_session_id)
2752 .await
2753 .expect("claim session")
2754 .expect("claimed session metadata");
2755
2756 let attached_input = interactive_input_command("attached");
2757 send_input(&shell_session_id, attached_input.as_str())
2758 .await
2759 .expect("send interactive input");
2760
2761 let mut saw_attached_output = false;
2762 let deadline =
2763 tokio::time::Instant::now() + Duration::from_secs(PTY_EVENT_TIMEOUT_SECS);
2764 while tokio::time::Instant::now() < deadline {
2765 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
2766 let chunk = tokio::time::timeout(remaining, attach_rx.recv())
2767 .await
2768 .expect("timed out waiting for attached shell output")
2769 .expect("attach channel closed unexpectedly");
2770
2771 if let StreamChunk::ShellOutput { data, .. } = chunk
2772 && data.contains("attached")
2773 {
2774 saw_attached_output = true;
2775 break;
2776 }
2777 }
2778
2779 assert!(
2780 saw_attached_output,
2781 "claimed automation session did not stream attached shell output"
2782 );
2783
2784 shutdown_session("pty-attach-pool")
2785 .await
2786 .expect("shutdown attached PTY session pool");
2787 })
2788 .await;
2789 }
2790 }
2791}
2792
2793pub async fn create_session(
2795 pool_key: &str,
2796 initial_cwd: Option<&str>,
2797 tx: Option<mpsc::Sender<StreamChunk>>,
2798) -> Result<ShellSessionHandle> {
2799 imp::create_session(pool_key, initial_cwd, tx).await
2800}
2801
2802pub async fn execute_in_session(
2804 pool_key: &str,
2805 initial_cwd: Option<&str>,
2806 command: &str,
2807 command_cwd: Option<&str>,
2808 timeout_secs: Option<u64>,
2809 tx: mpsc::Sender<StreamChunk>,
2810) -> Result<StreamingCommandResult> {
2811 execute_in_session_with_options(
2812 pool_key,
2813 initial_cwd,
2814 command,
2815 command_cwd,
2816 ShellExecutionOptions {
2817 timeout_secs,
2818 ..ShellExecutionOptions::default()
2819 },
2820 tx,
2821 )
2822 .await
2823}
2824
2825pub async fn execute_in_session_with_options(
2827 pool_key: &str,
2828 initial_cwd: Option<&str>,
2829 command: &str,
2830 command_cwd: Option<&str>,
2831 options: ShellExecutionOptions,
2832 tx: mpsc::Sender<StreamChunk>,
2833) -> Result<StreamingCommandResult> {
2834 imp::execute_in_session_with_options(pool_key, initial_cwd, command, command_cwd, options, tx)
2835 .await
2836}
2837
2838pub async fn stop_process(process_id: &str) -> Result<Option<ShellSessionHandle>> {
2840 imp::stop_process(process_id).await
2841}
2842
2843pub async fn stop_session(shell_session_id: &str) -> Result<Option<ShellSessionHandle>> {
2845 imp::stop_session(shell_session_id).await
2846}
2847
2848pub async fn shutdown_session(pool_key: &str) -> Result<()> {
2850 imp::shutdown_session(pool_key).await
2851}
2852
2853pub async fn send_input(shell_session_id: &str, data: &str) -> Result<()> {
2855 imp::send_input(shell_session_id, data).await
2856}
2857
2858pub async fn resize_session(shell_session_id: &str, cols: u16, rows: u16) -> Result<()> {
2860 imp::resize_session(shell_session_id, cols, rows).await
2861}
2862
2863pub async fn describe_session(shell_session_id: &str) -> Result<Option<ShellSessionMetadata>> {
2865 imp::describe_session(shell_session_id).await
2866}
2867
2868pub async fn claim_session(shell_session_id: &str) -> Result<Option<ShellSessionMetadata>> {
2870 imp::claim_session(shell_session_id).await
2871}
2872
2873pub async fn subscribe_session(
2875 shell_session_id: &str,
2876 tx: mpsc::Sender<StreamChunk>,
2877) -> Result<Option<ShellSessionMetadata>> {
2878 imp::subscribe_session(shell_session_id, tx).await
2879}