1pub mod registry;
17pub mod schemas;
18
19pub use gestura_core_tools::{
22 code, file, git, mcp_manager, permissions, policy, screen, shell, web,
23};
24
25pub use code::CodeTools;
26pub use file::FileTools;
27pub use git::GitTools;
28pub use permissions::PermissionManager;
29pub use registry::{
30 ToolDefinition, all_tools, find_tool, looks_like_capabilities_question,
31 looks_like_tools_question, render_capabilities, render_tool_detail, render_tools_overview,
32};
33pub use screen::ScreenTools;
34pub use shell::ShellTools;
35pub use web::WebTools;
36
37pub use gestura_core_tools::{code_async, file_async, git_async, screen_async, shell_async};
39
40pub mod shell_sessions;
42
43pub mod shell_streaming {
51 use crate::streaming::{ShellOutputStream, ShellProcessState, StreamChunk};
52 #[cfg(unix)]
53 use nix::libc;
54 use std::collections::HashMap;
55 use std::time::Duration;
56 use tokio::io::{AsyncBufReadExt, BufReader};
57 use tokio::process::Command;
58 use tokio::sync::mpsc;
59
60 const SHELL_OUTPUT_SEND_TIMEOUT: Duration = Duration::from_millis(100);
61
62 async fn send_shell_output_chunk_best_effort(
63 tx: &mpsc::Sender<StreamChunk>,
64 chunk: StreamChunk,
65 ) {
66 match tokio::time::timeout(SHELL_OUTPUT_SEND_TIMEOUT, tx.send(chunk)).await {
67 Ok(Ok(())) | Ok(Err(_)) => {}
68 Err(_) => {
69 tracing::debug!(
70 timeout_ms = SHELL_OUTPUT_SEND_TIMEOUT.as_millis(),
71 "Dropping shell output chunk because the stream receiver is not draining fast enough"
72 );
73 }
74 }
75 }
76
77 pub async fn execute_streaming(
82 command: &str,
83 cwd: Option<&str>,
84 env: Option<&HashMap<String, String>>,
85 timeout_secs: Option<u64>,
86 tx: mpsc::Sender<StreamChunk>,
87 ) -> crate::error::Result<StreamingCommandResult> {
88 let process_id = uuid::Uuid::new_v4().to_string();
89 let timeout = std::time::Duration::from_secs(timeout_secs.unwrap_or(300));
90
91 let mut cmd = Command::new("sh");
92 cmd.arg("-c")
93 .arg(command)
94 .stdin(std::process::Stdio::null())
95 .stdout(std::process::Stdio::piped())
96 .stderr(std::process::Stdio::piped());
97
98 if let Some(dir) = cwd {
99 cmd.current_dir(dir);
100 }
101 if let Some(env_map) = env {
102 cmd.envs(env_map);
103 }
104
105 let cwd_owned = cwd.map(String::from);
106 let cmd_str = command.to_string();
107 let start = std::time::Instant::now();
108
109 let mut child = cmd.spawn().map_err(crate::error::AppError::Io)?;
110
111 let os_pid = child.id().unwrap_or(0);
113 register_process(&process_id, os_pid, command, cwd, env, timeout_secs).await;
114
115 let _ = tx
117 .send(StreamChunk::ShellLifecycle {
118 process_id: process_id.clone(),
119 shell_session_id: None,
120 state: ShellProcessState::Started,
121 exit_code: None,
122 duration_ms: None,
123 command: cmd_str.clone(),
124 cwd: cwd_owned.clone(),
125 })
126 .await;
127
128 let stdout = child
129 .stdout
130 .take()
131 .ok_or_else(|| crate::error::AppError::Io(std::io::Error::other("no stdout")))?;
132 let stderr = child
133 .stderr
134 .take()
135 .ok_or_else(|| crate::error::AppError::Io(std::io::Error::other("no stderr")))?;
136
137 let stdout_reader = BufReader::new(stdout);
138 let stderr_reader = BufReader::new(stderr);
139
140 let tx_out = tx.clone();
141 let pid_out = process_id.clone();
142 let stdout_task = tokio::spawn(async move {
143 let mut lines = stdout_reader.lines();
144 let mut collected = String::new();
145 while let Ok(Some(line)) = lines.next_line().await {
146 collected.push_str(&line);
147 collected.push('\n');
148 send_shell_output_chunk_best_effort(
149 &tx_out,
150 StreamChunk::ShellOutput {
151 process_id: pid_out.clone(),
152 shell_session_id: None,
153 stream: ShellOutputStream::Stdout,
154 data: format!("{line}\n"),
155 },
156 )
157 .await;
158 }
159 collected
160 });
161
162 let tx_err = tx.clone();
163 let pid_err = process_id.clone();
164 let stderr_task = tokio::spawn(async move {
165 let mut lines = stderr_reader.lines();
166 let mut collected = String::new();
167 while let Ok(Some(line)) = lines.next_line().await {
168 collected.push_str(&line);
169 collected.push('\n');
170 send_shell_output_chunk_best_effort(
171 &tx_err,
172 StreamChunk::ShellOutput {
173 process_id: pid_err.clone(),
174 shell_session_id: None,
175 stream: ShellOutputStream::Stderr,
176 data: format!("{line}\n"),
177 },
178 )
179 .await;
180 }
181 collected
182 });
183
184 let timed_out;
186 let status = tokio::select! {
187 result = child.wait() => {
188 timed_out = false;
189 result.map_err(crate::error::AppError::Io)?
190 }
191 _ = tokio::time::sleep(timeout) => {
192 timed_out = true;
193 let _ = child.kill().await;
195 child.wait().await.map_err(crate::error::AppError::Io)?
196 }
197 };
198
199 let stdout_text = stdout_task.await.unwrap_or_default();
201 let stderr_text = stderr_task.await.unwrap_or_default();
202
203 let duration_ms = start.elapsed().as_millis() as u64;
204 let exit_code = if timed_out {
205 124
206 } else {
207 status.code().unwrap_or(-1)
208 };
209 let success = !timed_out && status.success();
210
211 let final_state = if success {
212 ShellProcessState::Completed
213 } else {
214 ShellProcessState::Failed
215 };
216
217 unregister_process(&process_id).await;
219
220 let _ = tx
222 .send(StreamChunk::ShellLifecycle {
223 process_id: process_id.clone(),
224 shell_session_id: None,
225 state: final_state,
226 exit_code: Some(exit_code),
227 duration_ms: Some(duration_ms),
228 command: cmd_str.clone(),
229 cwd: cwd_owned,
230 })
231 .await;
232
233 Ok(StreamingCommandResult {
234 process_id,
235 command: cmd_str,
236 stdout: stdout_text,
237 stderr: stderr_text,
238 exit_code,
239 success,
240 duration_ms,
241 failure_kind: timed_out.then_some(ShellRuntimeFailureKind::TimedOut),
242 })
243 }
244
245 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
247 pub enum ShellRuntimeFailureKind {
248 WaitingForInput,
250 ErrorOutput,
252 TimedOut,
254 }
255
256 #[derive(Debug, Clone)]
258 pub struct StreamingCommandResult {
259 pub process_id: String,
260 pub command: String,
261 pub stdout: String,
262 pub stderr: String,
263 pub exit_code: i32,
264 pub success: bool,
265 pub duration_ms: u64,
266 pub failure_kind: Option<ShellRuntimeFailureKind>,
267 }
268
269 use std::sync::OnceLock;
274 use tokio::sync::Mutex;
275
276 struct ProcessEntry {
278 pid: u32,
280 command: String,
282 cwd: Option<String>,
284 env: Option<HashMap<String, String>>,
286 timeout_secs: Option<u64>,
288 }
289
290 static PROCESS_MANAGER: OnceLock<Mutex<HashMap<String, ProcessEntry>>> = OnceLock::new();
292
293 fn process_map() -> &'static Mutex<HashMap<String, ProcessEntry>> {
294 PROCESS_MANAGER.get_or_init(|| Mutex::new(HashMap::new()))
295 }
296
297 pub(crate) async fn register_process(
299 process_id: &str,
300 pid: u32,
301 command: &str,
302 cwd: Option<&str>,
303 env: Option<&HashMap<String, String>>,
304 timeout_secs: Option<u64>,
305 ) {
306 let mut map = process_map().lock().await;
307 map.insert(
308 process_id.to_string(),
309 ProcessEntry {
310 pid,
311 command: command.to_string(),
312 cwd: cwd.map(String::from),
313 env: env.cloned(),
314 timeout_secs,
315 },
316 );
317 }
318
319 pub(crate) async fn unregister_process(process_id: &str) {
321 let mut map = process_map().lock().await;
322 map.remove(process_id);
323 }
324
325 pub async fn stop_process(process_id: &str) -> crate::error::Result<()> {
327 let pid = {
328 let map = process_map().lock().await;
329 map.get(process_id).map(|e| e.pid).ok_or_else(|| {
330 crate::error::AppError::Io(std::io::Error::other(format!(
331 "No running process with id {process_id}"
332 )))
333 })?
334 };
335
336 #[cfg(unix)]
338 {
339 unsafe { libc::kill(pid as i32, libc::SIGTERM) };
340 tokio::spawn(async move {
342 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
343 unsafe { libc::kill(pid as i32, libc::SIGKILL) };
344 });
345 Ok(())
346 }
347 #[cfg(not(unix))]
348 {
349 let _ = pid;
350 Err(crate::error::AppError::Io(std::io::Error::other(
351 "Process signals not supported on this platform",
352 )))
353 }
354 }
355
356 #[cfg(unix)]
358 pub async fn pause_process(process_id: &str) -> crate::error::Result<()> {
359 let pid = {
360 let map = process_map().lock().await;
361 map.get(process_id).map(|e| e.pid).ok_or_else(|| {
362 crate::error::AppError::Io(std::io::Error::other(format!(
363 "No running process with id {process_id}"
364 )))
365 })?
366 };
367 unsafe { libc::kill(pid as i32, libc::SIGSTOP) };
368 Ok(())
369 }
370
371 #[cfg(unix)]
373 pub async fn resume_process(process_id: &str) -> crate::error::Result<()> {
374 let pid = {
375 let map = process_map().lock().await;
376 map.get(process_id).map(|e| e.pid).ok_or_else(|| {
377 crate::error::AppError::Io(std::io::Error::other(format!(
378 "No running process with id {process_id}"
379 )))
380 })?
381 };
382 unsafe { libc::kill(pid as i32, libc::SIGCONT) };
383 Ok(())
384 }
385
386 pub async fn get_rerun_info(
389 process_id: &str,
390 ) -> Option<(
391 String,
392 Option<String>,
393 Option<HashMap<String, String>>,
394 Option<u64>,
395 )> {
396 let map = process_map().lock().await;
397 map.get(process_id).map(|e| {
398 (
399 e.command.clone(),
400 e.cwd.clone(),
401 e.env.clone(),
402 e.timeout_secs,
403 )
404 })
405 }
406}
407
408