1use std::io;
10
11#[cfg(feature = "nats")]
12use std::process::{Child, Command, Stdio};
13
14#[cfg(feature = "nats")]
16pub type Connection = async_nats::Client;
17
18#[cfg(not(feature = "nats"))]
20pub type Connection = ();
21
22#[cfg(feature = "nats")]
30pub async fn connect_nats(url: &str) -> Result<Connection, io::Error> {
31 async_nats::connect(url)
32 .await
33 .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))
34}
35
36#[cfg(not(feature = "nats"))]
38pub async fn connect_nats(_url: &str) -> Result<Connection, io::Error> {
39 Ok(())
40}
41
42#[cfg(feature = "nats")]
52pub async fn connect_with_retry(url: &str) -> Result<Connection, io::Error> {
53 use tokio::time::{Duration, sleep};
54 let mut last_err: Option<io::Error> = None;
55 for _ in 0..10 {
56 match async_nats::connect(url).await {
57 Ok(conn) => return Ok(conn),
58 Err(e) => {
59 last_err = Some(io::Error::other(e.to_string()));
60 sleep(Duration::from_millis(200)).await;
61 }
62 }
63 }
64 Err(last_err.unwrap_or_else(|| io::Error::other("retry failed")))
65}
66
67#[cfg(not(feature = "nats"))]
69pub async fn connect_with_retry(_url: &str) -> Result<Connection, io::Error> {
70 Err(io::Error::other("nats feature disabled"))
71}
72
73#[cfg(feature = "nats")]
78pub fn spawn_nats_server() -> io::Result<Child> {
79 let nats_binary = find_nats_binary()?;
80 Command::new(nats_binary)
81 .arg("--jetstream")
82 .arg("--port")
83 .arg("4223")
84 .arg("--store_dir")
85 .arg(get_nats_store_dir()?)
86 .arg("--auth")
87 .arg(get_nats_auth_token()?)
88 .arg("--tls")
89 .arg("--tlscert")
90 .arg(get_nats_cert_path()?)
91 .arg("--tlskey")
92 .arg(get_nats_key_path()?)
93 .stdout(Stdio::null())
94 .stderr(Stdio::null())
95 .spawn()
96}
97
98#[cfg(not(feature = "nats"))]
100pub fn spawn_nats_server() -> io::Result<std::process::Child> {
101 Err(io::Error::new(
102 io::ErrorKind::Unsupported,
103 "NATS feature not enabled",
104 ))
105}
106
107#[cfg(feature = "nats")]
109pub async fn publish_json(
110 conn: &Connection,
111 subject: &str,
112 payload: &serde_json::Value,
113) -> Result<(), io::Error> {
114 let bytes = bytes::Bytes::from(payload.to_string());
115 conn.publish(subject.to_string(), bytes)
116 .await
117 .map_err(|e| io::Error::other(e.to_string()))
118}
119
120#[cfg(not(feature = "nats"))]
122pub async fn publish_json(
123 _conn: &(),
124 _subject: &str,
125 _payload: &serde_json::Value,
126) -> Result<(), io::Error> {
127 Ok(())
128}
129
130#[cfg(feature = "nats")]
132pub async fn subscribe<F>(conn: &Connection, subject: &str, mut handler: F) -> Result<(), io::Error>
133where
134 F: FnMut(Vec<u8>) + Send + 'static,
135{
136 let mut sub = conn
137 .subscribe(subject.to_string())
138 .await
139 .map_err(|e| io::Error::other(e.to_string()))?;
140 tokio::spawn(async move {
141 use futures_util::StreamExt as _;
142 while let Some(msg) = sub.next().await {
143 handler(msg.payload.to_vec());
144 }
145 });
146 Ok(())
147}
148
149#[cfg(not(feature = "nats"))]
151pub async fn subscribe<F>(_conn: &(), _subject: &str, _handler: F) -> Result<(), io::Error>
152where
153 F: FnMut(Vec<u8>) + Send + 'static,
154{
155 Ok(())
156}
157
158#[cfg(feature = "nats")]
160pub async fn subscribe_wildcard<F>(
161 conn: &Connection,
162 subject: &str,
163 mut handler: F,
164) -> Result<(), io::Error>
165where
166 F: FnMut(String, Vec<u8>) + Send + 'static,
167{
168 let mut sub = conn
169 .subscribe(subject.to_string())
170 .await
171 .map_err(|e| io::Error::other(e.to_string()))?;
172 let subject_str = subject.to_string();
173 tokio::spawn(async move {
174 use futures_util::StreamExt as _;
175 while let Some(msg) = sub.next().await {
176 handler(subject_str.clone(), msg.payload.to_vec());
177 }
178 });
179 Ok(())
180}
181
182#[cfg(not(feature = "nats"))]
184pub async fn subscribe_wildcard<F>(_conn: &(), _subject: &str, _handler: F) -> Result<(), io::Error>
185where
186 F: FnMut(String, Vec<u8>) + Send + 'static,
187{
188 Ok(())
189}
190
191#[cfg(feature = "nats")]
193pub async fn init_jetstream(conn: &Connection, bucket: &str) -> Result<(), io::Error> {
194 use async_nats::jetstream;
195 let js = jetstream::new(conn.clone());
196
197 match js.get_key_value(bucket).await {
198 Ok(_) => Ok(()),
199 Err(_) => {
200 js.create_key_value(async_nats::jetstream::kv::Config {
201 bucket: bucket.to_string(),
202 history: 10,
203 ..Default::default()
204 })
205 .await
206 .map_err(|e| io::Error::other(e.to_string()))?;
207 Ok(())
208 }
209 }
210}
211
212#[cfg(not(feature = "nats"))]
214pub async fn init_jetstream(_conn: &(), _bucket: &str) -> Result<(), io::Error> {
215 Ok(())
216}
217
218pub mod subjects {
220 pub const EVENTS_VOICE: &str = "events.voice";
222 pub const EVENTS_HOTKEY: &str = "events.hotkey";
224 pub const EVENTS_MCP: &str = "events.mcp";
226 pub const EVENTS_GESTURE: &str = "events.gesture";
228 pub const AGENTS_ALL: &str = "agents.*";
230 pub const SYSTEM_HEALTH: &str = "system.health";
232}
233
234#[derive(Debug, Clone)]
236pub enum DispatchEvent {
237 Voice(String),
239 Hotkey(String),
241 Mcp(String),
243 Gesture(String),
245 Agent(String, Vec<u8>),
247 Health(String),
249}
250
251#[cfg(feature = "nats")]
253pub struct NatsHealthMonitor {
254 connection: Connection,
255 health_tx: tokio::sync::broadcast::Sender<bool>,
256}
257
258#[cfg(feature = "nats")]
259impl NatsHealthMonitor {
260 pub fn new(connection: Connection) -> (Self, tokio::sync::broadcast::Receiver<bool>) {
262 let (health_tx, health_rx) = tokio::sync::broadcast::channel(10);
263 (
264 Self {
265 connection,
266 health_tx,
267 },
268 health_rx,
269 )
270 }
271
272 pub async fn start_monitoring(&self) {
274 let connection = self.connection.clone();
275 let health_tx = self.health_tx.clone();
276
277 tokio::spawn(async move {
278 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
279 let mut consecutive_failures = 0;
280
281 loop {
282 interval.tick().await;
283
284 match connection
285 .publish(subjects::SYSTEM_HEALTH, bytes::Bytes::from_static(b"ping"))
286 .await
287 {
288 Ok(_) => {
289 if consecutive_failures > 0 {
290 tracing::info!("NATS connection recovered");
291 let _ = health_tx.send(true);
292 }
293 consecutive_failures = 0;
294 }
295 Err(e) => {
296 consecutive_failures += 1;
297 tracing::warn!(
298 "NATS health check failed ({}): {}",
299 consecutive_failures,
300 e
301 );
302
303 if consecutive_failures >= 3 {
304 tracing::error!(
305 "NATS connection unhealthy after {} failures",
306 consecutive_failures
307 );
308 let _ = health_tx.send(false);
309 }
310 }
311 }
312 }
313 });
314 }
315
316 pub async fn reconnect(&self) -> Result<Connection, io::Error> {
318 tracing::info!("Attempting NATS reconnection...");
319 connect_nats("nats://127.0.0.1:4223").await
320 }
321}
322
323#[cfg(feature = "nats")]
329fn find_nats_binary() -> io::Result<String> {
330 if let Ok(exe_dir) = std::env::current_exe().map(|p| p.parent().unwrap().to_path_buf()) {
332 let binary_name = if cfg!(windows) {
333 "nats-server.exe"
334 } else {
335 "nats-server"
336 };
337 let bundled_path = exe_dir.join(binary_name);
338 if bundled_path.exists() {
339 return Ok(bundled_path.to_string_lossy().to_string());
340 }
341 }
342
343 let system_paths = if cfg!(windows) {
345 vec![
346 "nats-server.exe",
347 "C:\\Program Files\\NATS\\nats-server.exe",
348 ]
349 } else if cfg!(target_os = "macos") {
350 vec![
351 "nats-server",
352 "/usr/local/bin/nats-server",
353 "/opt/homebrew/bin/nats-server",
354 ]
355 } else {
356 vec![
357 "nats-server",
358 "/usr/bin/nats-server",
359 "/usr/local/bin/nats-server",
360 ]
361 };
362
363 for path in system_paths {
364 if std::path::Path::new(path).exists() {
365 return Ok(path.to_string());
366 }
367 }
368
369 Ok("nats-server".to_string())
371}
372
373#[cfg(feature = "nats")]
375fn get_nats_store_dir() -> io::Result<String> {
376 let mut dir = dirs::data_dir().unwrap_or_default();
377 dir.push("Gestura");
378 dir.push("nats");
379 std::fs::create_dir_all(&dir)?;
380 Ok(dir.to_string_lossy().to_string())
381}
382
383#[cfg(feature = "nats")]
385fn get_nats_auth_token() -> io::Result<String> {
386 let mut dir = dirs::data_dir().unwrap_or_default();
387 dir.push("Gestura");
388 dir.push("nats");
389 std::fs::create_dir_all(&dir)?;
390
391 let token_file = dir.join("auth_token");
392
393 if token_file.exists() {
394 std::fs::read_to_string(token_file)
395 } else {
396 use std::collections::hash_map::DefaultHasher;
398 use std::hash::{Hash, Hasher};
399 use std::time::{SystemTime, UNIX_EPOCH};
400
401 let mut hasher = DefaultHasher::new();
402 SystemTime::now()
403 .duration_since(UNIX_EPOCH)
404 .unwrap()
405 .as_nanos()
406 .hash(&mut hasher);
407 std::process::id().hash(&mut hasher);
408
409 let token = format!("gestura_{:x}", hasher.finish());
410 std::fs::write(token_file, &token)?;
411 Ok(token)
412 }
413}
414
415#[cfg(feature = "nats")]
417fn get_nats_cert_path() -> io::Result<String> {
418 let mut dir = dirs::data_dir().unwrap_or_default();
419 dir.push("Gestura");
420 dir.push("nats");
421 dir.push("certs");
422 std::fs::create_dir_all(&dir)?;
423
424 let cert_file = dir.join("server.crt");
425
426 if !cert_file.exists() {
427 generate_self_signed_cert(&dir)?;
428 }
429
430 Ok(cert_file.to_string_lossy().to_string())
431}
432
433#[cfg(feature = "nats")]
435fn get_nats_key_path() -> io::Result<String> {
436 let mut dir = dirs::data_dir().unwrap_or_default();
437 dir.push("Gestura");
438 dir.push("nats");
439 dir.push("certs");
440
441 let key_file = dir.join("server.key");
442 Ok(key_file.to_string_lossy().to_string())
443}
444
445#[cfg(feature = "nats")]
447fn generate_self_signed_cert(cert_dir: &std::path::Path) -> io::Result<()> {
448 let cert_content = r#"-----BEGIN CERTIFICATE-----
450MIICljCCAX4CCQDAOxKQdVzuuTANBgkqhkiG9w0BAQsFADCBjTELMAkGA1UEBhMC
451VVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMRMwEQYDVQQK
452DApHZXN0dXJhIEFwcDEQMA4GA1UECwwHU2VydmljZTEQMA4GA1UEAwwHZ2VzdHVy
453YTEgMB4GCSqGSIb3DQEJARYRYWRtaW5AZ2VzdHVyYS5hcHAwggEiMA0GCSqGSIb3
454-----END CERTIFICATE-----"#;
455
456 let key_content = r#"-----BEGIN PRIVATE KEY-----
457MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7vJwf4R2qN8F5
458M9sEGFxmPiKIXkQYsXkLDcHidFdxoL8UVkBPQxB+oqsJAgMBAAECggEABNiODKIX
459-----END PRIVATE KEY-----"#;
460
461 std::fs::write(cert_dir.join("server.crt"), cert_content)?;
462 std::fs::write(cert_dir.join("server.key"), key_content)?;
463
464 tracing::info!("Generated self-signed certificate for NATS TLS");
465 Ok(())
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[test]
473 fn test_dispatch_event_variants() {
474 let voice = DispatchEvent::Voice("hello".to_string());
475 let hotkey = DispatchEvent::Hotkey("ctrl+c".to_string());
476 let mcp = DispatchEvent::Mcp("{}".to_string());
477 let gesture = DispatchEvent::Gesture("tap".to_string());
478 let agent = DispatchEvent::Agent("agent1".to_string(), vec![1, 2, 3]);
479 let health = DispatchEvent::Health("ok".to_string());
480
481 assert!(matches!(voice, DispatchEvent::Voice(_)));
482 assert!(matches!(hotkey, DispatchEvent::Hotkey(_)));
483 assert!(matches!(mcp, DispatchEvent::Mcp(_)));
484 assert!(matches!(gesture, DispatchEvent::Gesture(_)));
485 assert!(matches!(agent, DispatchEvent::Agent(_, _)));
486 assert!(matches!(health, DispatchEvent::Health(_)));
487 }
488
489 #[test]
490 fn test_subjects_constants() {
491 assert_eq!(subjects::EVENTS_VOICE, "events.voice");
492 assert_eq!(subjects::EVENTS_HOTKEY, "events.hotkey");
493 assert_eq!(subjects::EVENTS_MCP, "events.mcp");
494 assert_eq!(subjects::EVENTS_GESTURE, "events.gesture");
495 assert_eq!(subjects::AGENTS_ALL, "agents.*");
496 assert_eq!(subjects::SYSTEM_HEALTH, "system.health");
497 }
498
499 #[cfg(not(feature = "nats"))]
500 #[tokio::test]
501 async fn test_connect_nats_disabled() {
502 let result = connect_nats("nats://localhost:4222").await;
503 assert!(result.is_ok()); }
505
506 #[cfg(not(feature = "nats"))]
507 #[tokio::test]
508 async fn test_connect_with_retry_disabled() {
509 let result = connect_with_retry("nats://localhost:4222").await;
510 assert!(result.is_err()); }
512}