gestura_core_streaming/
cancellation.rs

1//! Streaming cancellation registry.
2//!
3//! This module centralizes cancellation-token storage semantics for streaming requests.
4//! Frontends (GUI/CLI) are responsible for choosing an appropriate cancel key that
5//! prevents cross-window or cross-session leakage. The core owns:
6//! - token storage
7//! - collision semantics (replacing an existing token cancels the old one)
8//! - cancellation (remove + cancel)
9
10use std::collections::HashMap;
11use std::sync::Mutex;
12
13use crate::streaming::CancellationToken;
14
15/// In-memory registry of streaming cancellation tokens.
16#[derive(Debug, Default)]
17pub struct StreamCancellationRegistry {
18    tokens: Mutex<HashMap<String, CancellationToken>>,
19}
20
21impl StreamCancellationRegistry {
22    /// Create a new empty registry.
23    pub fn new() -> Self {
24        Self {
25            tokens: Mutex::new(HashMap::new()),
26        }
27    }
28
29    /// Register a token for `key`.
30    ///
31    /// If an existing token is registered under the same key, it is cancelled and replaced.
32    pub fn register(&self, key: String, token: CancellationToken) {
33        let mut map = self
34            .tokens
35            .lock()
36            .expect("stream cancellation registry poisoned");
37        if let Some(prev) = map.insert(key, token) {
38            prev.cancel();
39        }
40    }
41
42    /// Cancel the token for `key`.
43    ///
44    /// Returns `true` if a token existed and was cancelled.
45    pub fn cancel(&self, key: &str) -> bool {
46        let mut map = self
47            .tokens
48            .lock()
49            .expect("stream cancellation registry poisoned");
50        if let Some(token) = map.remove(key) {
51            token.cancel();
52            true
53        } else {
54            false
55        }
56    }
57
58    /// Pause the token for `key`.
59    ///
60    /// Returns `true` if a token existed and was marked as resumably paused.
61    pub fn pause(&self, key: &str) -> bool {
62        let mut map = self
63            .tokens
64            .lock()
65            .expect("stream cancellation registry poisoned");
66        if let Some(token) = map.remove(key) {
67            token.pause();
68            true
69        } else {
70            false
71        }
72    }
73
74    /// Remove the token for `key` without cancelling it.
75    ///
76    /// This is used for cleanup after a stream completes normally.
77    pub fn remove(&self, key: &str) {
78        let mut map = self
79            .tokens
80            .lock()
81            .expect("stream cancellation registry poisoned");
82        map.remove(key);
83    }
84
85    /// Returns `true` if the registry currently contains `key`.
86    pub fn contains_key(&self, key: &str) -> bool {
87        let map = self
88            .tokens
89            .lock()
90            .expect("stream cancellation registry poisoned");
91        map.contains_key(key)
92    }
93}
94
95/// Global cancellation registry used by frontends that need process-wide cancellation.
96pub static STREAM_CANCELLATIONS: std::sync::LazyLock<StreamCancellationRegistry> =
97    std::sync::LazyLock::new(StreamCancellationRegistry::new);
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn register_replaces_and_cancels_previous() {
105        let reg = StreamCancellationRegistry::new();
106        let key = "k".to_string();
107        let t1 = CancellationToken::new();
108        let t2 = CancellationToken::new();
109
110        reg.register(key.clone(), t1.clone());
111        reg.register(key.clone(), t2);
112
113        assert!(t1.is_cancelled());
114        assert!(reg.contains_key(&key));
115    }
116
117    #[test]
118    fn cancel_removes_and_cancels() {
119        let reg = StreamCancellationRegistry::new();
120        let key = "k2".to_string();
121        let t = CancellationToken::new();
122        reg.register(key.clone(), t.clone());
123
124        assert!(reg.cancel(&key));
125        assert!(t.is_cancelled());
126        assert!(!reg.contains_key(&key));
127        assert!(!reg.cancel(&key));
128    }
129
130    #[test]
131    fn pause_removes_and_marks_token_resumable() {
132        let reg = StreamCancellationRegistry::new();
133        let key = "k3".to_string();
134        let t = CancellationToken::new();
135        reg.register(key.clone(), t.clone());
136
137        assert!(reg.pause(&key));
138        assert!(t.is_cancelled());
139        assert!(t.is_pause_requested());
140        assert!(!reg.contains_key(&key));
141        assert!(!reg.pause(&key));
142    }
143}