gestura_core_foundation/
stream_health.rs

1//! Stream Health Monitoring
2//!
3//! Provides health monitoring, heartbeat detection, and timeout handling
4//! for streaming LLM responses.
5
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::time::Instant;
10use tokio::sync::mpsc;
11
12/// Default heartbeat interval in seconds
13pub const DEFAULT_HEARTBEAT_INTERVAL_SECS: u64 = 30;
14
15/// Default stream timeout in seconds (no activity)
16pub const DEFAULT_STREAM_TIMEOUT_SECS: u64 = 120;
17
18/// Stream health status
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum StreamHealthStatus {
21    /// Stream is healthy and receiving data
22    Healthy,
23    /// Stream is idle but within timeout
24    Idle,
25    /// Stream is stalled (no activity for extended period)
26    Stalled,
27    /// Stream has timed out
28    TimedOut,
29    /// Stream has been cancelled
30    Cancelled,
31    /// Stream completed successfully
32    Completed,
33    /// Stream failed with error
34    Failed,
35}
36
37/// Stream health event for frontend notification
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum StreamHealthEvent {
40    /// Heartbeat received
41    Heartbeat {
42        /// Time since last activity in milliseconds
43        idle_ms: u64,
44    },
45    /// Stream status changed
46    StatusChanged {
47        /// Previous status
48        from: StreamHealthStatus,
49        /// New status
50        to: StreamHealthStatus,
51    },
52    /// Stream timeout warning
53    TimeoutWarning {
54        /// Seconds until timeout
55        seconds_remaining: u64,
56    },
57    /// Stream recovered from stall
58    Recovered {
59        /// Duration of stall in milliseconds
60        stall_duration_ms: u64,
61    },
62}
63
64/// Configuration for stream health monitoring
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StreamHealthConfig {
67    /// Heartbeat interval in seconds
68    pub heartbeat_interval_secs: u64,
69    /// Stream timeout in seconds (no activity)
70    pub timeout_secs: u64,
71    /// Warning threshold before timeout (percentage)
72    pub warning_threshold_percent: u8,
73    /// Enable automatic recovery attempts
74    pub auto_recovery: bool,
75    /// Maximum recovery attempts
76    pub max_recovery_attempts: u32,
77}
78
79impl Default for StreamHealthConfig {
80    fn default() -> Self {
81        Self {
82            heartbeat_interval_secs: DEFAULT_HEARTBEAT_INTERVAL_SECS,
83            timeout_secs: DEFAULT_STREAM_TIMEOUT_SECS,
84            warning_threshold_percent: 80,
85            auto_recovery: true,
86            max_recovery_attempts: 3,
87        }
88    }
89}
90
91/// Stream health monitor
92///
93/// Tracks stream activity and provides health status updates.
94pub struct StreamHealthMonitor {
95    config: StreamHealthConfig,
96    status: Arc<AtomicU64>,
97    last_activity: Arc<AtomicU64>,
98    cancelled: Arc<AtomicBool>,
99    start_time: Instant,
100    event_tx: Option<mpsc::Sender<StreamHealthEvent>>,
101}
102
103impl StreamHealthMonitor {
104    /// Create a new stream health monitor
105    pub fn new(config: StreamHealthConfig) -> Self {
106        Self {
107            config,
108            status: Arc::new(AtomicU64::new(StreamHealthStatus::Healthy as u64)),
109            last_activity: Arc::new(AtomicU64::new(0)),
110            cancelled: Arc::new(AtomicBool::new(false)),
111            start_time: Instant::now(),
112            event_tx: None,
113        }
114    }
115
116    /// Create with event channel for notifications
117    pub fn with_events(config: StreamHealthConfig, tx: mpsc::Sender<StreamHealthEvent>) -> Self {
118        Self {
119            config,
120            status: Arc::new(AtomicU64::new(StreamHealthStatus::Healthy as u64)),
121            last_activity: Arc::new(AtomicU64::new(0)),
122            cancelled: Arc::new(AtomicBool::new(false)),
123            start_time: Instant::now(),
124            event_tx: Some(tx),
125        }
126    }
127
128    /// Record activity on the stream
129    pub fn record_activity(&self) {
130        let now = self.start_time.elapsed().as_millis() as u64;
131        self.last_activity.store(now, Ordering::SeqCst);
132
133        // Check if we're recovering from a stall
134        let current_status = self.status();
135        if current_status == StreamHealthStatus::Stalled {
136            self.set_status(StreamHealthStatus::Healthy);
137        }
138    }
139
140    /// Get current stream status
141    pub fn status(&self) -> StreamHealthStatus {
142        let val = self.status.load(Ordering::SeqCst);
143        match val {
144            0 => StreamHealthStatus::Healthy,
145            1 => StreamHealthStatus::Idle,
146            2 => StreamHealthStatus::Stalled,
147            3 => StreamHealthStatus::TimedOut,
148            4 => StreamHealthStatus::Cancelled,
149            5 => StreamHealthStatus::Completed,
150            _ => StreamHealthStatus::Failed,
151        }
152    }
153
154    /// Set stream status
155    fn set_status(&self, status: StreamHealthStatus) {
156        let old = self.status();
157        self.status.store(status as u64, Ordering::SeqCst);
158
159        if old != status
160            && let Some(ref tx) = self.event_tx
161        {
162            let _ = tx.try_send(StreamHealthEvent::StatusChanged {
163                from: old,
164                to: status,
165            });
166        }
167    }
168
169    /// Mark stream as cancelled
170    pub fn cancel(&self) {
171        self.cancelled.store(true, Ordering::SeqCst);
172        self.set_status(StreamHealthStatus::Cancelled);
173    }
174
175    /// Check if stream is cancelled
176    pub fn is_cancelled(&self) -> bool {
177        self.cancelled.load(Ordering::SeqCst)
178    }
179
180    /// Mark stream as completed
181    pub fn complete(&self) {
182        self.set_status(StreamHealthStatus::Completed);
183    }
184
185    /// Mark stream as failed
186    pub fn fail(&self) {
187        self.set_status(StreamHealthStatus::Failed);
188    }
189
190    /// Get time since last activity in milliseconds
191    pub fn idle_time_ms(&self) -> u64 {
192        let now = self.start_time.elapsed().as_millis() as u64;
193        let last = self.last_activity.load(Ordering::SeqCst);
194        now.saturating_sub(last)
195    }
196
197    /// Check stream health and update status
198    ///
199    /// Returns true if stream is still healthy/recoverable
200    pub fn check_health(&self) -> bool {
201        if self.is_cancelled() {
202            return false;
203        }
204
205        let idle_ms = self.idle_time_ms();
206        let timeout_ms = self.config.timeout_secs * 1000;
207        let warning_ms = timeout_ms * self.config.warning_threshold_percent as u64 / 100;
208        let stall_threshold_ms = self.config.heartbeat_interval_secs * 2 * 1000;
209
210        if idle_ms >= timeout_ms {
211            self.set_status(StreamHealthStatus::TimedOut);
212            return false;
213        }
214
215        if idle_ms >= warning_ms
216            && let Some(ref tx) = self.event_tx
217        {
218            let remaining = (timeout_ms - idle_ms) / 1000;
219            let _ = tx.try_send(StreamHealthEvent::TimeoutWarning {
220                seconds_remaining: remaining,
221            });
222        }
223
224        if idle_ms >= stall_threshold_ms {
225            self.set_status(StreamHealthStatus::Stalled);
226        } else if idle_ms >= self.config.heartbeat_interval_secs * 1000 {
227            self.set_status(StreamHealthStatus::Idle);
228        } else {
229            self.set_status(StreamHealthStatus::Healthy);
230        }
231
232        true
233    }
234
235    /// Get configuration
236    pub fn config(&self) -> &StreamHealthConfig {
237        &self.config
238    }
239
240    /// Create a handle for sharing across tasks
241    pub fn handle(&self) -> StreamHealthHandle {
242        StreamHealthHandle {
243            status: Arc::clone(&self.status),
244            last_activity: Arc::clone(&self.last_activity),
245            cancelled: Arc::clone(&self.cancelled),
246            start_time: self.start_time,
247        }
248    }
249}
250
251/// Lightweight handle for stream health monitoring
252///
253/// Can be cloned and shared across tasks for recording activity.
254#[derive(Clone)]
255pub struct StreamHealthHandle {
256    status: Arc<AtomicU64>,
257    last_activity: Arc<AtomicU64>,
258    cancelled: Arc<AtomicBool>,
259    start_time: Instant,
260}
261
262impl StreamHealthHandle {
263    /// Record activity on the stream
264    pub fn record_activity(&self) {
265        let now = self.start_time.elapsed().as_millis() as u64;
266        self.last_activity.store(now, Ordering::SeqCst);
267    }
268
269    /// Check if stream is cancelled
270    pub fn is_cancelled(&self) -> bool {
271        self.cancelled.load(Ordering::SeqCst)
272    }
273
274    /// Get current status
275    pub fn status(&self) -> StreamHealthStatus {
276        let val = self.status.load(Ordering::SeqCst);
277        match val {
278            0 => StreamHealthStatus::Healthy,
279            1 => StreamHealthStatus::Idle,
280            2 => StreamHealthStatus::Stalled,
281            3 => StreamHealthStatus::TimedOut,
282            4 => StreamHealthStatus::Cancelled,
283            5 => StreamHealthStatus::Completed,
284            _ => StreamHealthStatus::Failed,
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_stream_health_config_default() {
295        let config = StreamHealthConfig::default();
296        assert_eq!(
297            config.heartbeat_interval_secs,
298            DEFAULT_HEARTBEAT_INTERVAL_SECS
299        );
300        assert_eq!(config.timeout_secs, DEFAULT_STREAM_TIMEOUT_SECS);
301        assert!(config.auto_recovery);
302    }
303
304    #[test]
305    fn test_stream_health_monitor_initial_status() {
306        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
307        assert_eq!(monitor.status(), StreamHealthStatus::Healthy);
308        assert!(!monitor.is_cancelled());
309    }
310
311    #[test]
312    fn test_stream_health_monitor_cancel() {
313        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
314        monitor.cancel();
315        assert!(monitor.is_cancelled());
316        assert_eq!(monitor.status(), StreamHealthStatus::Cancelled);
317    }
318
319    #[test]
320    fn test_stream_health_monitor_complete() {
321        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
322        monitor.complete();
323        assert_eq!(monitor.status(), StreamHealthStatus::Completed);
324    }
325
326    #[test]
327    fn test_stream_health_monitor_fail() {
328        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
329        monitor.fail();
330        assert_eq!(monitor.status(), StreamHealthStatus::Failed);
331    }
332
333    #[test]
334    fn test_stream_health_monitor_record_activity() {
335        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
336        monitor.record_activity();
337        // Activity was just recorded, idle time should be very small
338        assert!(monitor.idle_time_ms() < 100);
339    }
340
341    #[test]
342    fn test_stream_health_handle_clone() {
343        let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
344        let handle1 = monitor.handle();
345        let handle2 = handle1.clone();
346
347        handle1.record_activity();
348        assert!(!handle2.is_cancelled());
349        assert_eq!(handle2.status(), StreamHealthStatus::Healthy);
350    }
351
352    #[tokio::test]
353    async fn test_stream_health_events() {
354        let (tx, mut rx) = mpsc::channel(10);
355        let monitor = StreamHealthMonitor::with_events(StreamHealthConfig::default(), tx);
356
357        monitor.complete();
358
359        if let Some(event) = rx.recv().await {
360            match event {
361                StreamHealthEvent::StatusChanged { from, to } => {
362                    assert_eq!(from, StreamHealthStatus::Healthy);
363                    assert_eq!(to, StreamHealthStatus::Completed);
364                }
365                _ => panic!("Expected StatusChanged event"),
366            }
367        }
368    }
369}