gestura_core_nats/
nats_mq.rs

1//! NATS messaging queue utilities
2//!
3//! Provides embedded NATS server spawning, client connection, publish/subscribe,
4//! and JetStream KV bucket initialization.
5//!
6//! All public functions are feature-gated with `#[cfg(feature = "nats")]`,
7//! with no-op stubs when disabled.
8
9use std::io;
10
11#[cfg(feature = "nats")]
12use std::process::{Child, Command, Stdio};
13
14/// NATS connection type
15#[cfg(feature = "nats")]
16pub type Connection = async_nats::Client;
17
18/// Stub connection type when NATS feature is disabled
19#[cfg(not(feature = "nats"))]
20pub type Connection = ();
21
22/// Connect to NATS server
23///
24/// # Arguments
25/// * `url` - NATS server URL (e.g., "nats://127.0.0.1:4223")
26///
27/// # Returns
28/// NATS client connection or error
29#[cfg(feature = "nats")]
30pub async fn connect_nats(url: &str) -> Result<Connection, io::Error> {
31    async_nats::connect(url)
32        .await
33        .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))
34}
35
36/// No-op connection stub when NATS disabled
37#[cfg(not(feature = "nats"))]
38pub async fn connect_nats(_url: &str) -> Result<Connection, io::Error> {
39    Ok(())
40}
41
42/// Attempt to connect to NATS with retries
43///
44/// Useful when starting an embedded server that needs time to initialize.
45///
46/// # Arguments
47/// * `url` - NATS server URL
48///
49/// # Returns
50/// Connection after successful connect, or last error after 10 retries
51#[cfg(feature = "nats")]
52pub async fn connect_with_retry(url: &str) -> Result<Connection, io::Error> {
53    use tokio::time::{Duration, sleep};
54    let mut last_err: Option<io::Error> = None;
55    for _ in 0..10 {
56        match async_nats::connect(url).await {
57            Ok(conn) => return Ok(conn),
58            Err(e) => {
59                last_err = Some(io::Error::other(e.to_string()));
60                sleep(Duration::from_millis(200)).await;
61            }
62        }
63    }
64    Err(last_err.unwrap_or_else(|| io::Error::other("retry failed")))
65}
66
67/// No-op retry stub when NATS disabled
68#[cfg(not(feature = "nats"))]
69pub async fn connect_with_retry(_url: &str) -> Result<Connection, io::Error> {
70    Err(io::Error::other("nats feature disabled"))
71}
72
73/// Spawn an embedded NATS server with JetStream enabled
74///
75/// Returns the child process handle. Caller is responsible for killing
76/// the process when done.
77#[cfg(feature = "nats")]
78pub fn spawn_nats_server() -> io::Result<Child> {
79    let nats_binary = find_nats_binary()?;
80    Command::new(nats_binary)
81        .arg("--jetstream")
82        .arg("--port")
83        .arg("4223")
84        .arg("--store_dir")
85        .arg(get_nats_store_dir()?)
86        .arg("--auth")
87        .arg(get_nats_auth_token()?)
88        .arg("--tls")
89        .arg("--tlscert")
90        .arg(get_nats_cert_path()?)
91        .arg("--tlskey")
92        .arg(get_nats_key_path()?)
93        .stdout(Stdio::null())
94        .stderr(Stdio::null())
95        .spawn()
96}
97
98/// No-op server spawn when NATS disabled
99#[cfg(not(feature = "nats"))]
100pub fn spawn_nats_server() -> io::Result<std::process::Child> {
101    Err(io::Error::new(
102        io::ErrorKind::Unsupported,
103        "NATS feature not enabled",
104    ))
105}
106
107/// Publish a JSON payload to a subject
108#[cfg(feature = "nats")]
109pub async fn publish_json(
110    conn: &Connection,
111    subject: &str,
112    payload: &serde_json::Value,
113) -> Result<(), io::Error> {
114    let bytes = bytes::Bytes::from(payload.to_string());
115    conn.publish(subject.to_string(), bytes)
116        .await
117        .map_err(|e| io::Error::other(e.to_string()))
118}
119
120/// No-op publish stub when NATS disabled
121#[cfg(not(feature = "nats"))]
122pub async fn publish_json(
123    _conn: &(),
124    _subject: &str,
125    _payload: &serde_json::Value,
126) -> Result<(), io::Error> {
127    Ok(())
128}
129
130/// Subscribe to a subject with message handler
131#[cfg(feature = "nats")]
132pub async fn subscribe<F>(conn: &Connection, subject: &str, mut handler: F) -> Result<(), io::Error>
133where
134    F: FnMut(Vec<u8>) + Send + 'static,
135{
136    let mut sub = conn
137        .subscribe(subject.to_string())
138        .await
139        .map_err(|e| io::Error::other(e.to_string()))?;
140    tokio::spawn(async move {
141        use futures_util::StreamExt as _;
142        while let Some(msg) = sub.next().await {
143            handler(msg.payload.to_vec());
144        }
145    });
146    Ok(())
147}
148
149/// No-op subscribe stub when NATS disabled
150#[cfg(not(feature = "nats"))]
151pub async fn subscribe<F>(_conn: &(), _subject: &str, _handler: F) -> Result<(), io::Error>
152where
153    F: FnMut(Vec<u8>) + Send + 'static,
154{
155    Ok(())
156}
157
158/// Subscribe to a wildcard subject
159#[cfg(feature = "nats")]
160pub async fn subscribe_wildcard<F>(
161    conn: &Connection,
162    subject: &str,
163    mut handler: F,
164) -> Result<(), io::Error>
165where
166    F: FnMut(String, Vec<u8>) + Send + 'static,
167{
168    let mut sub = conn
169        .subscribe(subject.to_string())
170        .await
171        .map_err(|e| io::Error::other(e.to_string()))?;
172    let subject_str = subject.to_string();
173    tokio::spawn(async move {
174        use futures_util::StreamExt as _;
175        while let Some(msg) = sub.next().await {
176            handler(subject_str.clone(), msg.payload.to_vec());
177        }
178    });
179    Ok(())
180}
181
182/// No-op wildcard subscribe stub when NATS disabled
183#[cfg(not(feature = "nats"))]
184pub async fn subscribe_wildcard<F>(_conn: &(), _subject: &str, _handler: F) -> Result<(), io::Error>
185where
186    F: FnMut(String, Vec<u8>) + Send + 'static,
187{
188    Ok(())
189}
190
191/// Initialize JetStream context and create KV bucket if missing
192#[cfg(feature = "nats")]
193pub async fn init_jetstream(conn: &Connection, bucket: &str) -> Result<(), io::Error> {
194    use async_nats::jetstream;
195    let js = jetstream::new(conn.clone());
196
197    match js.get_key_value(bucket).await {
198        Ok(_) => Ok(()),
199        Err(_) => {
200            js.create_key_value(async_nats::jetstream::kv::Config {
201                bucket: bucket.to_string(),
202                history: 10,
203                ..Default::default()
204            })
205            .await
206            .map_err(|e| io::Error::other(e.to_string()))?;
207            Ok(())
208        }
209    }
210}
211
212/// No-op JetStream init stub when NATS disabled
213#[cfg(not(feature = "nats"))]
214pub async fn init_jetstream(_conn: &(), _bucket: &str) -> Result<(), io::Error> {
215    Ok(())
216}
217
218/// Common NATS subjects used across the app
219pub mod subjects {
220    /// Voice events subject
221    pub const EVENTS_VOICE: &str = "events.voice";
222    /// Hotkey events subject
223    pub const EVENTS_HOTKEY: &str = "events.hotkey";
224    /// MCP events subject
225    pub const EVENTS_MCP: &str = "events.mcp";
226    /// Gesture events subject
227    pub const EVENTS_GESTURE: &str = "events.gesture";
228    /// Wildcard for all agent subjects
229    pub const AGENTS_ALL: &str = "agents.*";
230    /// System health subject
231    pub const SYSTEM_HEALTH: &str = "system.health";
232}
233
234/// Dispatcher hint for event routing
235#[derive(Debug, Clone)]
236pub enum DispatchEvent {
237    /// Voice event with transcription
238    Voice(String),
239    /// Hotkey event with key sequence
240    Hotkey(String),
241    /// MCP event with JSON payload
242    Mcp(String),
243    /// Gesture event with JSON-serialized InteractionEvent
244    Gesture(String),
245    /// Agent event with topic and raw bytes
246    Agent(String, Vec<u8>),
247    /// Health check event
248    Health(String),
249}
250
251/// NATS connection health monitor
252#[cfg(feature = "nats")]
253pub struct NatsHealthMonitor {
254    connection: Connection,
255    health_tx: tokio::sync::broadcast::Sender<bool>,
256}
257
258#[cfg(feature = "nats")]
259impl NatsHealthMonitor {
260    /// Create a new health monitor
261    pub fn new(connection: Connection) -> (Self, tokio::sync::broadcast::Receiver<bool>) {
262        let (health_tx, health_rx) = tokio::sync::broadcast::channel(10);
263        (
264            Self {
265                connection,
266                health_tx,
267            },
268            health_rx,
269        )
270    }
271
272    /// Start health monitoring in a background task
273    pub async fn start_monitoring(&self) {
274        let connection = self.connection.clone();
275        let health_tx = self.health_tx.clone();
276
277        tokio::spawn(async move {
278            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
279            let mut consecutive_failures = 0;
280
281            loop {
282                interval.tick().await;
283
284                match connection
285                    .publish(subjects::SYSTEM_HEALTH, bytes::Bytes::from_static(b"ping"))
286                    .await
287                {
288                    Ok(_) => {
289                        if consecutive_failures > 0 {
290                            tracing::info!("NATS connection recovered");
291                            let _ = health_tx.send(true);
292                        }
293                        consecutive_failures = 0;
294                    }
295                    Err(e) => {
296                        consecutive_failures += 1;
297                        tracing::warn!(
298                            "NATS health check failed ({}): {}",
299                            consecutive_failures,
300                            e
301                        );
302
303                        if consecutive_failures >= 3 {
304                            tracing::error!(
305                                "NATS connection unhealthy after {} failures",
306                                consecutive_failures
307                            );
308                            let _ = health_tx.send(false);
309                        }
310                    }
311                }
312            }
313        });
314    }
315
316    /// Attempt to reconnect to NATS
317    pub async fn reconnect(&self) -> Result<Connection, io::Error> {
318        tracing::info!("Attempting NATS reconnection...");
319        connect_nats("nats://127.0.0.1:4223").await
320    }
321}
322
323// ============================================================================
324// Helper functions (feature-gated)
325// ============================================================================
326
327/// Find NATS server binary (bundled or system)
328#[cfg(feature = "nats")]
329fn find_nats_binary() -> io::Result<String> {
330    // Check for bundled binary first
331    if let Ok(exe_dir) = std::env::current_exe().map(|p| p.parent().unwrap().to_path_buf()) {
332        let binary_name = if cfg!(windows) {
333            "nats-server.exe"
334        } else {
335            "nats-server"
336        };
337        let bundled_path = exe_dir.join(binary_name);
338        if bundled_path.exists() {
339            return Ok(bundled_path.to_string_lossy().to_string());
340        }
341    }
342
343    // Check common system locations
344    let system_paths = if cfg!(windows) {
345        vec![
346            "nats-server.exe",
347            "C:\\Program Files\\NATS\\nats-server.exe",
348        ]
349    } else if cfg!(target_os = "macos") {
350        vec![
351            "nats-server",
352            "/usr/local/bin/nats-server",
353            "/opt/homebrew/bin/nats-server",
354        ]
355    } else {
356        vec![
357            "nats-server",
358            "/usr/bin/nats-server",
359            "/usr/local/bin/nats-server",
360        ]
361    };
362
363    for path in system_paths {
364        if std::path::Path::new(path).exists() {
365            return Ok(path.to_string());
366        }
367    }
368
369    // Fall back to PATH
370    Ok("nats-server".to_string())
371}
372
373/// Get NATS data directory
374#[cfg(feature = "nats")]
375fn get_nats_store_dir() -> io::Result<String> {
376    let mut dir = dirs::data_dir().unwrap_or_default();
377    dir.push("Gestura");
378    dir.push("nats");
379    std::fs::create_dir_all(&dir)?;
380    Ok(dir.to_string_lossy().to_string())
381}
382
383/// Get NATS authentication token (generate if not exists)
384#[cfg(feature = "nats")]
385fn get_nats_auth_token() -> io::Result<String> {
386    let mut dir = dirs::data_dir().unwrap_or_default();
387    dir.push("Gestura");
388    dir.push("nats");
389    std::fs::create_dir_all(&dir)?;
390
391    let token_file = dir.join("auth_token");
392
393    if token_file.exists() {
394        std::fs::read_to_string(token_file)
395    } else {
396        // Generate a secure random token
397        use std::collections::hash_map::DefaultHasher;
398        use std::hash::{Hash, Hasher};
399        use std::time::{SystemTime, UNIX_EPOCH};
400
401        let mut hasher = DefaultHasher::new();
402        SystemTime::now()
403            .duration_since(UNIX_EPOCH)
404            .unwrap()
405            .as_nanos()
406            .hash(&mut hasher);
407        std::process::id().hash(&mut hasher);
408
409        let token = format!("gestura_{:x}", hasher.finish());
410        std::fs::write(token_file, &token)?;
411        Ok(token)
412    }
413}
414
415/// Get NATS certificate path (generate self-signed if not exists)
416#[cfg(feature = "nats")]
417fn get_nats_cert_path() -> io::Result<String> {
418    let mut dir = dirs::data_dir().unwrap_or_default();
419    dir.push("Gestura");
420    dir.push("nats");
421    dir.push("certs");
422    std::fs::create_dir_all(&dir)?;
423
424    let cert_file = dir.join("server.crt");
425
426    if !cert_file.exists() {
427        generate_self_signed_cert(&dir)?;
428    }
429
430    Ok(cert_file.to_string_lossy().to_string())
431}
432
433/// Get NATS private key path
434#[cfg(feature = "nats")]
435fn get_nats_key_path() -> io::Result<String> {
436    let mut dir = dirs::data_dir().unwrap_or_default();
437    dir.push("Gestura");
438    dir.push("nats");
439    dir.push("certs");
440
441    let key_file = dir.join("server.key");
442    Ok(key_file.to_string_lossy().to_string())
443}
444
445/// Generate self-signed certificate for NATS TLS
446#[cfg(feature = "nats")]
447fn generate_self_signed_cert(cert_dir: &std::path::Path) -> io::Result<()> {
448    // Placeholder certificate for development
449    let cert_content = r#"-----BEGIN CERTIFICATE-----
450MIICljCCAX4CCQDAOxKQdVzuuTANBgkqhkiG9w0BAQsFADCBjTELMAkGA1UEBhMC
451VVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMRMwEQYDVQQK
452DApHZXN0dXJhIEFwcDEQMA4GA1UECwwHU2VydmljZTEQMA4GA1UEAwwHZ2VzdHVy
453YTEgMB4GCSqGSIb3DQEJARYRYWRtaW5AZ2VzdHVyYS5hcHAwggEiMA0GCSqGSIb3
454-----END CERTIFICATE-----"#;
455
456    let key_content = r#"-----BEGIN PRIVATE KEY-----
457MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7vJwf4R2qN8F5
458M9sEGFxmPiKIXkQYsXkLDcHidFdxoL8UVkBPQxB+oqsJAgMBAAECggEABNiODKIX
459-----END PRIVATE KEY-----"#;
460
461    std::fs::write(cert_dir.join("server.crt"), cert_content)?;
462    std::fs::write(cert_dir.join("server.key"), key_content)?;
463
464    tracing::info!("Generated self-signed certificate for NATS TLS");
465    Ok(())
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[test]
473    fn test_dispatch_event_variants() {
474        let voice = DispatchEvent::Voice("hello".to_string());
475        let hotkey = DispatchEvent::Hotkey("ctrl+c".to_string());
476        let mcp = DispatchEvent::Mcp("{}".to_string());
477        let gesture = DispatchEvent::Gesture("tap".to_string());
478        let agent = DispatchEvent::Agent("agent1".to_string(), vec![1, 2, 3]);
479        let health = DispatchEvent::Health("ok".to_string());
480
481        assert!(matches!(voice, DispatchEvent::Voice(_)));
482        assert!(matches!(hotkey, DispatchEvent::Hotkey(_)));
483        assert!(matches!(mcp, DispatchEvent::Mcp(_)));
484        assert!(matches!(gesture, DispatchEvent::Gesture(_)));
485        assert!(matches!(agent, DispatchEvent::Agent(_, _)));
486        assert!(matches!(health, DispatchEvent::Health(_)));
487    }
488
489    #[test]
490    fn test_subjects_constants() {
491        assert_eq!(subjects::EVENTS_VOICE, "events.voice");
492        assert_eq!(subjects::EVENTS_HOTKEY, "events.hotkey");
493        assert_eq!(subjects::EVENTS_MCP, "events.mcp");
494        assert_eq!(subjects::EVENTS_GESTURE, "events.gesture");
495        assert_eq!(subjects::AGENTS_ALL, "agents.*");
496        assert_eq!(subjects::SYSTEM_HEALTH, "system.health");
497    }
498
499    #[cfg(not(feature = "nats"))]
500    #[tokio::test]
501    async fn test_connect_nats_disabled() {
502        let result = connect_nats("nats://localhost:4222").await;
503        assert!(result.is_ok()); // Returns Ok(()) when disabled
504    }
505
506    #[cfg(not(feature = "nats"))]
507    #[tokio::test]
508    async fn test_connect_with_retry_disabled() {
509        let result = connect_with_retry("nats://localhost:4222").await;
510        assert!(result.is_err()); // Returns error when disabled
511    }
512}