gestura_core_streaming/
cancellation.rs1use std::collections::HashMap;
11use std::sync::Mutex;
12
13use crate::streaming::CancellationToken;
14
15#[derive(Debug, Default)]
17pub struct StreamCancellationRegistry {
18 tokens: Mutex<HashMap<String, CancellationToken>>,
19}
20
21impl StreamCancellationRegistry {
22 pub fn new() -> Self {
24 Self {
25 tokens: Mutex::new(HashMap::new()),
26 }
27 }
28
29 pub fn register(&self, key: String, token: CancellationToken) {
33 let mut map = self
34 .tokens
35 .lock()
36 .expect("stream cancellation registry poisoned");
37 if let Some(prev) = map.insert(key, token) {
38 prev.cancel();
39 }
40 }
41
42 pub fn cancel(&self, key: &str) -> bool {
46 let mut map = self
47 .tokens
48 .lock()
49 .expect("stream cancellation registry poisoned");
50 if let Some(token) = map.remove(key) {
51 token.cancel();
52 true
53 } else {
54 false
55 }
56 }
57
58 pub fn pause(&self, key: &str) -> bool {
62 let mut map = self
63 .tokens
64 .lock()
65 .expect("stream cancellation registry poisoned");
66 if let Some(token) = map.remove(key) {
67 token.pause();
68 true
69 } else {
70 false
71 }
72 }
73
74 pub fn remove(&self, key: &str) {
78 let mut map = self
79 .tokens
80 .lock()
81 .expect("stream cancellation registry poisoned");
82 map.remove(key);
83 }
84
85 pub fn contains_key(&self, key: &str) -> bool {
87 let map = self
88 .tokens
89 .lock()
90 .expect("stream cancellation registry poisoned");
91 map.contains_key(key)
92 }
93}
94
95pub static STREAM_CANCELLATIONS: std::sync::LazyLock<StreamCancellationRegistry> =
97 std::sync::LazyLock::new(StreamCancellationRegistry::new);
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn register_replaces_and_cancels_previous() {
105 let reg = StreamCancellationRegistry::new();
106 let key = "k".to_string();
107 let t1 = CancellationToken::new();
108 let t2 = CancellationToken::new();
109
110 reg.register(key.clone(), t1.clone());
111 reg.register(key.clone(), t2);
112
113 assert!(t1.is_cancelled());
114 assert!(reg.contains_key(&key));
115 }
116
117 #[test]
118 fn cancel_removes_and_cancels() {
119 let reg = StreamCancellationRegistry::new();
120 let key = "k2".to_string();
121 let t = CancellationToken::new();
122 reg.register(key.clone(), t.clone());
123
124 assert!(reg.cancel(&key));
125 assert!(t.is_cancelled());
126 assert!(!reg.contains_key(&key));
127 assert!(!reg.cancel(&key));
128 }
129
130 #[test]
131 fn pause_removes_and_marks_token_resumable() {
132 let reg = StreamCancellationRegistry::new();
133 let key = "k3".to_string();
134 let t = CancellationToken::new();
135 reg.register(key.clone(), t.clone());
136
137 assert!(reg.pause(&key));
138 assert!(t.is_cancelled());
139 assert!(t.is_pause_requested());
140 assert!(!reg.contains_key(&key));
141 assert!(!reg.pause(&key));
142 }
143}