gestura_core_foundation/
events.rs

1//! Typed event system for real-time UI updates
2//!
3//! This module provides a structured event system for communicating state changes
4//! from the agent pipeline to the frontend. Based on Block Goose architecture patterns.
5
6use serde::{Deserialize, Serialize};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9
10/// Event types for the agent pipeline
11#[derive(Debug, Clone, Serialize, Deserialize)]
12#[serde(tag = "type", content = "data")]
13pub enum AgentEvent {
14    /// Pipeline has started processing a request
15    PipelineStarted {
16        request_id: String,
17        timestamp_ms: u64,
18    },
19    /// Progress update for long-running operations
20    Progress {
21        request_id: String,
22        stage: ProgressStage,
23        percent: Option<u8>,
24        message: String,
25    },
26    /// Token streaming event
27    TokenStream {
28        request_id: String,
29        content: String,
30        is_thinking: bool,
31    },
32    /// Tool execution started
33    ToolStarted {
34        request_id: String,
35        tool_id: String,
36        tool_name: String,
37    },
38    /// Tool execution progress
39    ToolProgress {
40        request_id: String,
41        tool_id: String,
42        percent: Option<u8>,
43        message: String,
44    },
45    /// Tool execution completed
46    ToolCompleted {
47        request_id: String,
48        tool_id: String,
49        success: bool,
50        duration_ms: u64,
51        output_preview: String,
52    },
53    /// Context was compacted
54    ContextCompacted {
55        request_id: String,
56        messages_before: usize,
57        messages_after: usize,
58        tokens_saved: usize,
59    },
60    /// Retry attempt
61    RetryAttempt {
62        request_id: String,
63        attempt: u32,
64        max_attempts: u32,
65        delay_ms: u64,
66        reason: String,
67    },
68    /// Pipeline completed successfully
69    PipelineCompleted {
70        request_id: String,
71        duration_ms: u64,
72        tokens_used: Option<u64>,
73    },
74    /// Pipeline failed with error
75    PipelineFailed {
76        request_id: String,
77        error: String,
78        recoverable: bool,
79    },
80    /// Pipeline was cancelled
81    PipelineCancelled { request_id: String },
82    /// Pipeline was paused (cancelled with resume intent).
83    PipelinePaused { request_id: String },
84}
85
86/// Stages of pipeline progress
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub enum ProgressStage {
89    /// Analyzing the request
90    Analyzing,
91    /// Resolving context
92    ResolvingContext,
93    /// Waiting for LLM response
94    WaitingForLlm,
95    /// Executing tools
96    ExecutingTools,
97    /// Generating response
98    GeneratingResponse,
99    /// Finalizing
100    Finalizing,
101}
102
103impl ProgressStage {
104    /// Get a human-readable description of the stage
105    pub fn description(&self) -> &'static str {
106        match self {
107            Self::Analyzing => "Analyzing request...",
108            Self::ResolvingContext => "Resolving context...",
109            Self::WaitingForLlm => "Waiting for AI response...",
110            Self::ExecutingTools => "Executing tools...",
111            Self::GeneratingResponse => "Generating response...",
112            Self::Finalizing => "Finalizing...",
113        }
114    }
115}
116
117/// Configuration for event buffering
118#[derive(Debug, Clone)]
119pub struct EventBufferConfig {
120    /// Minimum interval between events of the same type
121    pub min_interval: Duration,
122    /// Maximum events to buffer before forcing a flush
123    pub max_buffer_size: usize,
124    /// Whether to coalesce similar events
125    pub coalesce_similar: bool,
126}
127
128impl Default for EventBufferConfig {
129    fn default() -> Self {
130        Self {
131            min_interval: Duration::from_millis(50),
132            max_buffer_size: 100,
133            coalesce_similar: true,
134        }
135    }
136}
137
138/// Event emitter with optional buffering for rate limiting
139pub struct EventEmitter {
140    tx: mpsc::Sender<AgentEvent>,
141    config: EventBufferConfig,
142    last_progress_emit: Option<Instant>,
143    last_token_emit: Option<Instant>,
144    token_buffer: String,
145}
146
147impl EventEmitter {
148    /// Create a new event emitter with default configuration
149    pub fn new(tx: mpsc::Sender<AgentEvent>) -> Self {
150        Self::with_config(tx, EventBufferConfig::default())
151    }
152
153    /// Create a new event emitter with custom configuration
154    pub fn with_config(tx: mpsc::Sender<AgentEvent>, config: EventBufferConfig) -> Self {
155        Self {
156            tx,
157            config,
158            last_progress_emit: None,
159            last_token_emit: None,
160            token_buffer: String::new(),
161        }
162    }
163
164    /// Emit an event immediately (bypasses buffering)
165    pub async fn emit(&self, event: AgentEvent) -> Result<(), mpsc::error::SendError<AgentEvent>> {
166        self.tx.send(event).await
167    }
168
169    /// Emit a progress event with rate limiting
170    pub async fn emit_progress(
171        &mut self,
172        request_id: String,
173        stage: ProgressStage,
174        percent: Option<u8>,
175        message: String,
176    ) {
177        let now = Instant::now();
178        let should_emit = self
179            .last_progress_emit
180            .map(|last| now.duration_since(last) >= self.config.min_interval)
181            .unwrap_or(true);
182
183        if should_emit {
184            self.last_progress_emit = Some(now);
185            let _ = self
186                .tx
187                .send(AgentEvent::Progress {
188                    request_id,
189                    stage,
190                    percent,
191                    message,
192                })
193                .await;
194        }
195    }
196
197    /// Buffer token content and emit when threshold is reached
198    pub async fn buffer_token(&mut self, request_id: &str, content: &str, is_thinking: bool) {
199        self.token_buffer.push_str(content);
200
201        let now = Instant::now();
202        let should_flush = self
203            .last_token_emit
204            .map(|last| now.duration_since(last) >= self.config.min_interval)
205            .unwrap_or(true)
206            || self.token_buffer.len() >= self.config.max_buffer_size;
207
208        if should_flush && !self.token_buffer.is_empty() {
209            self.last_token_emit = Some(now);
210            let content = std::mem::take(&mut self.token_buffer);
211            let _ = self
212                .tx
213                .send(AgentEvent::TokenStream {
214                    request_id: request_id.to_string(),
215                    content,
216                    is_thinking,
217                })
218                .await;
219        }
220    }
221
222    /// Flush any buffered tokens
223    pub async fn flush_tokens(&mut self, request_id: &str, is_thinking: bool) {
224        if !self.token_buffer.is_empty() {
225            let content = std::mem::take(&mut self.token_buffer);
226            let _ = self
227                .tx
228                .send(AgentEvent::TokenStream {
229                    request_id: request_id.to_string(),
230                    content,
231                    is_thinking,
232                })
233                .await;
234        }
235    }
236
237    /// Emit pipeline started event
238    pub async fn pipeline_started(&self, request_id: &str) {
239        let timestamp_ms = std::time::SystemTime::now()
240            .duration_since(std::time::UNIX_EPOCH)
241            .map(|d| d.as_millis() as u64)
242            .unwrap_or(0);
243
244        let _ = self
245            .tx
246            .send(AgentEvent::PipelineStarted {
247                request_id: request_id.to_string(),
248                timestamp_ms,
249            })
250            .await;
251    }
252
253    /// Emit pipeline completed event
254    pub async fn pipeline_completed(
255        &self,
256        request_id: &str,
257        duration_ms: u64,
258        tokens_used: Option<u64>,
259    ) {
260        let _ = self
261            .tx
262            .send(AgentEvent::PipelineCompleted {
263                request_id: request_id.to_string(),
264                duration_ms,
265                tokens_used,
266            })
267            .await;
268    }
269
270    /// Emit pipeline failed event
271    pub async fn pipeline_failed(&self, request_id: &str, error: &str, recoverable: bool) {
272        let _ = self
273            .tx
274            .send(AgentEvent::PipelineFailed {
275                request_id: request_id.to_string(),
276                error: error.to_string(),
277                recoverable,
278            })
279            .await;
280    }
281
282    /// Emit tool started event
283    pub async fn tool_started(&self, request_id: &str, tool_id: &str, tool_name: &str) {
284        let _ = self
285            .tx
286            .send(AgentEvent::ToolStarted {
287                request_id: request_id.to_string(),
288                tool_id: tool_id.to_string(),
289                tool_name: tool_name.to_string(),
290            })
291            .await;
292    }
293
294    /// Emit tool completed event
295    pub async fn tool_completed(
296        &self,
297        request_id: &str,
298        tool_id: &str,
299        success: bool,
300        duration_ms: u64,
301        output_preview: &str,
302    ) {
303        let _ = self
304            .tx
305            .send(AgentEvent::ToolCompleted {
306                request_id: request_id.to_string(),
307                tool_id: tool_id.to_string(),
308                success,
309                duration_ms,
310                output_preview: output_preview.to_string(),
311            })
312            .await;
313    }
314}
315
316/// Progress tracker for long-running operations
317#[derive(Debug, Clone)]
318pub struct ProgressTracker {
319    request_id: String,
320    current_stage: ProgressStage,
321    stage_start: Instant,
322    total_start: Instant,
323}
324
325impl ProgressTracker {
326    /// Create a new progress tracker
327    pub fn new(request_id: String) -> Self {
328        let now = Instant::now();
329        Self {
330            request_id,
331            current_stage: ProgressStage::Analyzing,
332            stage_start: now,
333            total_start: now,
334        }
335    }
336
337    /// Get the current request ID
338    pub fn request_id(&self) -> &str {
339        &self.request_id
340    }
341
342    /// Get the current stage
343    pub fn current_stage(&self) -> ProgressStage {
344        self.current_stage
345    }
346
347    /// Advance to the next stage
348    pub fn advance_to(&mut self, stage: ProgressStage) {
349        self.current_stage = stage;
350        self.stage_start = Instant::now();
351    }
352
353    /// Get duration of current stage
354    pub fn stage_duration(&self) -> Duration {
355        self.stage_start.elapsed()
356    }
357
358    /// Get total duration since start
359    pub fn total_duration(&self) -> Duration {
360        self.total_start.elapsed()
361    }
362
363    /// Get total duration in milliseconds
364    pub fn total_duration_ms(&self) -> u64 {
365        self.total_start.elapsed().as_millis() as u64
366    }
367}
368
369/// Shared event channel for broadcasting events to multiple listeners
370pub type EventReceiver = mpsc::Receiver<AgentEvent>;
371pub type EventSender = mpsc::Sender<AgentEvent>;
372
373/// Create a new event channel pair
374pub fn create_event_channel(buffer_size: usize) -> (EventSender, EventReceiver) {
375    mpsc::channel(buffer_size)
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_progress_stage_description() {
384        assert_eq!(
385            ProgressStage::Analyzing.description(),
386            "Analyzing request..."
387        );
388        assert_eq!(
389            ProgressStage::ExecutingTools.description(),
390            "Executing tools..."
391        );
392    }
393
394    #[test]
395    fn test_progress_tracker() {
396        let mut tracker = ProgressTracker::new("test-123".to_string());
397        assert_eq!(tracker.current_stage(), ProgressStage::Analyzing);
398        assert_eq!(tracker.request_id(), "test-123");
399
400        tracker.advance_to(ProgressStage::WaitingForLlm);
401        assert_eq!(tracker.current_stage(), ProgressStage::WaitingForLlm);
402    }
403
404    #[test]
405    fn test_event_buffer_config_default() {
406        let config = EventBufferConfig::default();
407        assert_eq!(config.min_interval, Duration::from_millis(50));
408        assert_eq!(config.max_buffer_size, 100);
409        assert!(config.coalesce_similar);
410    }
411
412    #[tokio::test]
413    async fn test_event_emitter_emit() {
414        let (tx, mut rx) = create_event_channel(10);
415        let emitter = EventEmitter::new(tx);
416
417        emitter.pipeline_started("req-1").await;
418
419        let event = rx.recv().await.unwrap();
420        match event {
421            AgentEvent::PipelineStarted { request_id, .. } => {
422                assert_eq!(request_id, "req-1");
423            }
424            _ => panic!("Expected PipelineStarted event"),
425        }
426    }
427}