1use serde::{Deserialize, Serialize};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12#[serde(tag = "type", content = "data")]
13pub enum AgentEvent {
14 PipelineStarted {
16 request_id: String,
17 timestamp_ms: u64,
18 },
19 Progress {
21 request_id: String,
22 stage: ProgressStage,
23 percent: Option<u8>,
24 message: String,
25 },
26 TokenStream {
28 request_id: String,
29 content: String,
30 is_thinking: bool,
31 },
32 ToolStarted {
34 request_id: String,
35 tool_id: String,
36 tool_name: String,
37 },
38 ToolProgress {
40 request_id: String,
41 tool_id: String,
42 percent: Option<u8>,
43 message: String,
44 },
45 ToolCompleted {
47 request_id: String,
48 tool_id: String,
49 success: bool,
50 duration_ms: u64,
51 output_preview: String,
52 },
53 ContextCompacted {
55 request_id: String,
56 messages_before: usize,
57 messages_after: usize,
58 tokens_saved: usize,
59 },
60 RetryAttempt {
62 request_id: String,
63 attempt: u32,
64 max_attempts: u32,
65 delay_ms: u64,
66 reason: String,
67 },
68 PipelineCompleted {
70 request_id: String,
71 duration_ms: u64,
72 tokens_used: Option<u64>,
73 },
74 PipelineFailed {
76 request_id: String,
77 error: String,
78 recoverable: bool,
79 },
80 PipelineCancelled { request_id: String },
82 PipelinePaused { request_id: String },
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub enum ProgressStage {
89 Analyzing,
91 ResolvingContext,
93 WaitingForLlm,
95 ExecutingTools,
97 GeneratingResponse,
99 Finalizing,
101}
102
103impl ProgressStage {
104 pub fn description(&self) -> &'static str {
106 match self {
107 Self::Analyzing => "Analyzing request...",
108 Self::ResolvingContext => "Resolving context...",
109 Self::WaitingForLlm => "Waiting for AI response...",
110 Self::ExecutingTools => "Executing tools...",
111 Self::GeneratingResponse => "Generating response...",
112 Self::Finalizing => "Finalizing...",
113 }
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct EventBufferConfig {
120 pub min_interval: Duration,
122 pub max_buffer_size: usize,
124 pub coalesce_similar: bool,
126}
127
128impl Default for EventBufferConfig {
129 fn default() -> Self {
130 Self {
131 min_interval: Duration::from_millis(50),
132 max_buffer_size: 100,
133 coalesce_similar: true,
134 }
135 }
136}
137
138pub struct EventEmitter {
140 tx: mpsc::Sender<AgentEvent>,
141 config: EventBufferConfig,
142 last_progress_emit: Option<Instant>,
143 last_token_emit: Option<Instant>,
144 token_buffer: String,
145}
146
147impl EventEmitter {
148 pub fn new(tx: mpsc::Sender<AgentEvent>) -> Self {
150 Self::with_config(tx, EventBufferConfig::default())
151 }
152
153 pub fn with_config(tx: mpsc::Sender<AgentEvent>, config: EventBufferConfig) -> Self {
155 Self {
156 tx,
157 config,
158 last_progress_emit: None,
159 last_token_emit: None,
160 token_buffer: String::new(),
161 }
162 }
163
164 pub async fn emit(&self, event: AgentEvent) -> Result<(), mpsc::error::SendError<AgentEvent>> {
166 self.tx.send(event).await
167 }
168
169 pub async fn emit_progress(
171 &mut self,
172 request_id: String,
173 stage: ProgressStage,
174 percent: Option<u8>,
175 message: String,
176 ) {
177 let now = Instant::now();
178 let should_emit = self
179 .last_progress_emit
180 .map(|last| now.duration_since(last) >= self.config.min_interval)
181 .unwrap_or(true);
182
183 if should_emit {
184 self.last_progress_emit = Some(now);
185 let _ = self
186 .tx
187 .send(AgentEvent::Progress {
188 request_id,
189 stage,
190 percent,
191 message,
192 })
193 .await;
194 }
195 }
196
197 pub async fn buffer_token(&mut self, request_id: &str, content: &str, is_thinking: bool) {
199 self.token_buffer.push_str(content);
200
201 let now = Instant::now();
202 let should_flush = self
203 .last_token_emit
204 .map(|last| now.duration_since(last) >= self.config.min_interval)
205 .unwrap_or(true)
206 || self.token_buffer.len() >= self.config.max_buffer_size;
207
208 if should_flush && !self.token_buffer.is_empty() {
209 self.last_token_emit = Some(now);
210 let content = std::mem::take(&mut self.token_buffer);
211 let _ = self
212 .tx
213 .send(AgentEvent::TokenStream {
214 request_id: request_id.to_string(),
215 content,
216 is_thinking,
217 })
218 .await;
219 }
220 }
221
222 pub async fn flush_tokens(&mut self, request_id: &str, is_thinking: bool) {
224 if !self.token_buffer.is_empty() {
225 let content = std::mem::take(&mut self.token_buffer);
226 let _ = self
227 .tx
228 .send(AgentEvent::TokenStream {
229 request_id: request_id.to_string(),
230 content,
231 is_thinking,
232 })
233 .await;
234 }
235 }
236
237 pub async fn pipeline_started(&self, request_id: &str) {
239 let timestamp_ms = std::time::SystemTime::now()
240 .duration_since(std::time::UNIX_EPOCH)
241 .map(|d| d.as_millis() as u64)
242 .unwrap_or(0);
243
244 let _ = self
245 .tx
246 .send(AgentEvent::PipelineStarted {
247 request_id: request_id.to_string(),
248 timestamp_ms,
249 })
250 .await;
251 }
252
253 pub async fn pipeline_completed(
255 &self,
256 request_id: &str,
257 duration_ms: u64,
258 tokens_used: Option<u64>,
259 ) {
260 let _ = self
261 .tx
262 .send(AgentEvent::PipelineCompleted {
263 request_id: request_id.to_string(),
264 duration_ms,
265 tokens_used,
266 })
267 .await;
268 }
269
270 pub async fn pipeline_failed(&self, request_id: &str, error: &str, recoverable: bool) {
272 let _ = self
273 .tx
274 .send(AgentEvent::PipelineFailed {
275 request_id: request_id.to_string(),
276 error: error.to_string(),
277 recoverable,
278 })
279 .await;
280 }
281
282 pub async fn tool_started(&self, request_id: &str, tool_id: &str, tool_name: &str) {
284 let _ = self
285 .tx
286 .send(AgentEvent::ToolStarted {
287 request_id: request_id.to_string(),
288 tool_id: tool_id.to_string(),
289 tool_name: tool_name.to_string(),
290 })
291 .await;
292 }
293
294 pub async fn tool_completed(
296 &self,
297 request_id: &str,
298 tool_id: &str,
299 success: bool,
300 duration_ms: u64,
301 output_preview: &str,
302 ) {
303 let _ = self
304 .tx
305 .send(AgentEvent::ToolCompleted {
306 request_id: request_id.to_string(),
307 tool_id: tool_id.to_string(),
308 success,
309 duration_ms,
310 output_preview: output_preview.to_string(),
311 })
312 .await;
313 }
314}
315
316#[derive(Debug, Clone)]
318pub struct ProgressTracker {
319 request_id: String,
320 current_stage: ProgressStage,
321 stage_start: Instant,
322 total_start: Instant,
323}
324
325impl ProgressTracker {
326 pub fn new(request_id: String) -> Self {
328 let now = Instant::now();
329 Self {
330 request_id,
331 current_stage: ProgressStage::Analyzing,
332 stage_start: now,
333 total_start: now,
334 }
335 }
336
337 pub fn request_id(&self) -> &str {
339 &self.request_id
340 }
341
342 pub fn current_stage(&self) -> ProgressStage {
344 self.current_stage
345 }
346
347 pub fn advance_to(&mut self, stage: ProgressStage) {
349 self.current_stage = stage;
350 self.stage_start = Instant::now();
351 }
352
353 pub fn stage_duration(&self) -> Duration {
355 self.stage_start.elapsed()
356 }
357
358 pub fn total_duration(&self) -> Duration {
360 self.total_start.elapsed()
361 }
362
363 pub fn total_duration_ms(&self) -> u64 {
365 self.total_start.elapsed().as_millis() as u64
366 }
367}
368
369pub type EventReceiver = mpsc::Receiver<AgentEvent>;
371pub type EventSender = mpsc::Sender<AgentEvent>;
372
373pub fn create_event_channel(buffer_size: usize) -> (EventSender, EventReceiver) {
375 mpsc::channel(buffer_size)
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn test_progress_stage_description() {
384 assert_eq!(
385 ProgressStage::Analyzing.description(),
386 "Analyzing request..."
387 );
388 assert_eq!(
389 ProgressStage::ExecutingTools.description(),
390 "Executing tools..."
391 );
392 }
393
394 #[test]
395 fn test_progress_tracker() {
396 let mut tracker = ProgressTracker::new("test-123".to_string());
397 assert_eq!(tracker.current_stage(), ProgressStage::Analyzing);
398 assert_eq!(tracker.request_id(), "test-123");
399
400 tracker.advance_to(ProgressStage::WaitingForLlm);
401 assert_eq!(tracker.current_stage(), ProgressStage::WaitingForLlm);
402 }
403
404 #[test]
405 fn test_event_buffer_config_default() {
406 let config = EventBufferConfig::default();
407 assert_eq!(config.min_interval, Duration::from_millis(50));
408 assert_eq!(config.max_buffer_size, 100);
409 assert!(config.coalesce_similar);
410 }
411
412 #[tokio::test]
413 async fn test_event_emitter_emit() {
414 let (tx, mut rx) = create_event_channel(10);
415 let emitter = EventEmitter::new(tx);
416
417 emitter.pipeline_started("req-1").await;
418
419 let event = rx.recv().await.unwrap();
420 match event {
421 AgentEvent::PipelineStarted { request_id, .. } => {
422 assert_eq!(request_id, "req-1");
423 }
424 _ => panic!("Expected PipelineStarted event"),
425 }
426 }
427}