1use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::time::Instant;
10use tokio::sync::mpsc;
11
12pub const DEFAULT_HEARTBEAT_INTERVAL_SECS: u64 = 30;
14
15pub const DEFAULT_STREAM_TIMEOUT_SECS: u64 = 120;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum StreamHealthStatus {
21 Healthy,
23 Idle,
25 Stalled,
27 TimedOut,
29 Cancelled,
31 Completed,
33 Failed,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum StreamHealthEvent {
40 Heartbeat {
42 idle_ms: u64,
44 },
45 StatusChanged {
47 from: StreamHealthStatus,
49 to: StreamHealthStatus,
51 },
52 TimeoutWarning {
54 seconds_remaining: u64,
56 },
57 Recovered {
59 stall_duration_ms: u64,
61 },
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StreamHealthConfig {
67 pub heartbeat_interval_secs: u64,
69 pub timeout_secs: u64,
71 pub warning_threshold_percent: u8,
73 pub auto_recovery: bool,
75 pub max_recovery_attempts: u32,
77}
78
79impl Default for StreamHealthConfig {
80 fn default() -> Self {
81 Self {
82 heartbeat_interval_secs: DEFAULT_HEARTBEAT_INTERVAL_SECS,
83 timeout_secs: DEFAULT_STREAM_TIMEOUT_SECS,
84 warning_threshold_percent: 80,
85 auto_recovery: true,
86 max_recovery_attempts: 3,
87 }
88 }
89}
90
91pub struct StreamHealthMonitor {
95 config: StreamHealthConfig,
96 status: Arc<AtomicU64>,
97 last_activity: Arc<AtomicU64>,
98 cancelled: Arc<AtomicBool>,
99 start_time: Instant,
100 event_tx: Option<mpsc::Sender<StreamHealthEvent>>,
101}
102
103impl StreamHealthMonitor {
104 pub fn new(config: StreamHealthConfig) -> Self {
106 Self {
107 config,
108 status: Arc::new(AtomicU64::new(StreamHealthStatus::Healthy as u64)),
109 last_activity: Arc::new(AtomicU64::new(0)),
110 cancelled: Arc::new(AtomicBool::new(false)),
111 start_time: Instant::now(),
112 event_tx: None,
113 }
114 }
115
116 pub fn with_events(config: StreamHealthConfig, tx: mpsc::Sender<StreamHealthEvent>) -> Self {
118 Self {
119 config,
120 status: Arc::new(AtomicU64::new(StreamHealthStatus::Healthy as u64)),
121 last_activity: Arc::new(AtomicU64::new(0)),
122 cancelled: Arc::new(AtomicBool::new(false)),
123 start_time: Instant::now(),
124 event_tx: Some(tx),
125 }
126 }
127
128 pub fn record_activity(&self) {
130 let now = self.start_time.elapsed().as_millis() as u64;
131 self.last_activity.store(now, Ordering::SeqCst);
132
133 let current_status = self.status();
135 if current_status == StreamHealthStatus::Stalled {
136 self.set_status(StreamHealthStatus::Healthy);
137 }
138 }
139
140 pub fn status(&self) -> StreamHealthStatus {
142 let val = self.status.load(Ordering::SeqCst);
143 match val {
144 0 => StreamHealthStatus::Healthy,
145 1 => StreamHealthStatus::Idle,
146 2 => StreamHealthStatus::Stalled,
147 3 => StreamHealthStatus::TimedOut,
148 4 => StreamHealthStatus::Cancelled,
149 5 => StreamHealthStatus::Completed,
150 _ => StreamHealthStatus::Failed,
151 }
152 }
153
154 fn set_status(&self, status: StreamHealthStatus) {
156 let old = self.status();
157 self.status.store(status as u64, Ordering::SeqCst);
158
159 if old != status
160 && let Some(ref tx) = self.event_tx
161 {
162 let _ = tx.try_send(StreamHealthEvent::StatusChanged {
163 from: old,
164 to: status,
165 });
166 }
167 }
168
169 pub fn cancel(&self) {
171 self.cancelled.store(true, Ordering::SeqCst);
172 self.set_status(StreamHealthStatus::Cancelled);
173 }
174
175 pub fn is_cancelled(&self) -> bool {
177 self.cancelled.load(Ordering::SeqCst)
178 }
179
180 pub fn complete(&self) {
182 self.set_status(StreamHealthStatus::Completed);
183 }
184
185 pub fn fail(&self) {
187 self.set_status(StreamHealthStatus::Failed);
188 }
189
190 pub fn idle_time_ms(&self) -> u64 {
192 let now = self.start_time.elapsed().as_millis() as u64;
193 let last = self.last_activity.load(Ordering::SeqCst);
194 now.saturating_sub(last)
195 }
196
197 pub fn check_health(&self) -> bool {
201 if self.is_cancelled() {
202 return false;
203 }
204
205 let idle_ms = self.idle_time_ms();
206 let timeout_ms = self.config.timeout_secs * 1000;
207 let warning_ms = timeout_ms * self.config.warning_threshold_percent as u64 / 100;
208 let stall_threshold_ms = self.config.heartbeat_interval_secs * 2 * 1000;
209
210 if idle_ms >= timeout_ms {
211 self.set_status(StreamHealthStatus::TimedOut);
212 return false;
213 }
214
215 if idle_ms >= warning_ms
216 && let Some(ref tx) = self.event_tx
217 {
218 let remaining = (timeout_ms - idle_ms) / 1000;
219 let _ = tx.try_send(StreamHealthEvent::TimeoutWarning {
220 seconds_remaining: remaining,
221 });
222 }
223
224 if idle_ms >= stall_threshold_ms {
225 self.set_status(StreamHealthStatus::Stalled);
226 } else if idle_ms >= self.config.heartbeat_interval_secs * 1000 {
227 self.set_status(StreamHealthStatus::Idle);
228 } else {
229 self.set_status(StreamHealthStatus::Healthy);
230 }
231
232 true
233 }
234
235 pub fn config(&self) -> &StreamHealthConfig {
237 &self.config
238 }
239
240 pub fn handle(&self) -> StreamHealthHandle {
242 StreamHealthHandle {
243 status: Arc::clone(&self.status),
244 last_activity: Arc::clone(&self.last_activity),
245 cancelled: Arc::clone(&self.cancelled),
246 start_time: self.start_time,
247 }
248 }
249}
250
251#[derive(Clone)]
255pub struct StreamHealthHandle {
256 status: Arc<AtomicU64>,
257 last_activity: Arc<AtomicU64>,
258 cancelled: Arc<AtomicBool>,
259 start_time: Instant,
260}
261
262impl StreamHealthHandle {
263 pub fn record_activity(&self) {
265 let now = self.start_time.elapsed().as_millis() as u64;
266 self.last_activity.store(now, Ordering::SeqCst);
267 }
268
269 pub fn is_cancelled(&self) -> bool {
271 self.cancelled.load(Ordering::SeqCst)
272 }
273
274 pub fn status(&self) -> StreamHealthStatus {
276 let val = self.status.load(Ordering::SeqCst);
277 match val {
278 0 => StreamHealthStatus::Healthy,
279 1 => StreamHealthStatus::Idle,
280 2 => StreamHealthStatus::Stalled,
281 3 => StreamHealthStatus::TimedOut,
282 4 => StreamHealthStatus::Cancelled,
283 5 => StreamHealthStatus::Completed,
284 _ => StreamHealthStatus::Failed,
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_stream_health_config_default() {
295 let config = StreamHealthConfig::default();
296 assert_eq!(
297 config.heartbeat_interval_secs,
298 DEFAULT_HEARTBEAT_INTERVAL_SECS
299 );
300 assert_eq!(config.timeout_secs, DEFAULT_STREAM_TIMEOUT_SECS);
301 assert!(config.auto_recovery);
302 }
303
304 #[test]
305 fn test_stream_health_monitor_initial_status() {
306 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
307 assert_eq!(monitor.status(), StreamHealthStatus::Healthy);
308 assert!(!monitor.is_cancelled());
309 }
310
311 #[test]
312 fn test_stream_health_monitor_cancel() {
313 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
314 monitor.cancel();
315 assert!(monitor.is_cancelled());
316 assert_eq!(monitor.status(), StreamHealthStatus::Cancelled);
317 }
318
319 #[test]
320 fn test_stream_health_monitor_complete() {
321 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
322 monitor.complete();
323 assert_eq!(monitor.status(), StreamHealthStatus::Completed);
324 }
325
326 #[test]
327 fn test_stream_health_monitor_fail() {
328 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
329 monitor.fail();
330 assert_eq!(monitor.status(), StreamHealthStatus::Failed);
331 }
332
333 #[test]
334 fn test_stream_health_monitor_record_activity() {
335 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
336 monitor.record_activity();
337 assert!(monitor.idle_time_ms() < 100);
339 }
340
341 #[test]
342 fn test_stream_health_handle_clone() {
343 let monitor = StreamHealthMonitor::new(StreamHealthConfig::default());
344 let handle1 = monitor.handle();
345 let handle2 = handle1.clone();
346
347 handle1.record_activity();
348 assert!(!handle2.is_cancelled());
349 assert_eq!(handle2.status(), StreamHealthStatus::Healthy);
350 }
351
352 #[tokio::test]
353 async fn test_stream_health_events() {
354 let (tx, mut rx) = mpsc::channel(10);
355 let monitor = StreamHealthMonitor::with_events(StreamHealthConfig::default(), tx);
356
357 monitor.complete();
358
359 if let Some(event) = rx.recv().await {
360 match event {
361 StreamHealthEvent::StatusChanged { from, to } => {
362 assert_eq!(from, StreamHealthStatus::Healthy);
363 assert_eq!(to, StreamHealthStatus::Completed);
364 }
365 _ => panic!("Expected StatusChanged event"),
366 }
367 }
368 }
369}