1use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use super::{AgentRole, ApprovalActorKind, ApprovalScope, TaskArtifactRecord, TaskResult};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "snake_case")]
12pub enum TeamMessageKind {
13 StatusUpdate,
15 Clarification,
17 Blocker,
19 Handoff,
21 ReviewFeedback,
23 ApprovalDecision,
25 ReviewRequest,
27 ApprovalRequest,
29 TestValidationRequest,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum CollaborationRequestKind {
37 BlockerEscalation,
39 Handoff,
41 Clarification,
43 ReviewRequest,
45 ApprovalRequest,
47 TestValidationRequest,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
53#[serde(rename_all = "snake_case")]
54pub enum CollaborationActionStatus {
55 #[default]
57 Open,
58 Acknowledged,
60 Resolved,
62 NeedsRevision,
64 Cancelled,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
70#[serde(rename_all = "snake_case")]
71pub enum CollaborationThreadStatus {
72 #[default]
74 Active,
75 ActionRequired,
77 NeedsRevision,
79 Resolved,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(rename_all = "snake_case")]
86pub enum CollaborationEscalationLevel {
87 Info,
89 Warning,
91 Critical,
93}
94
95pub const DEFAULT_RESOLVED_THREAD_RETENTION_DAYS: i64 = 7;
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct TeamActionRequestDraft {
101 pub kind: CollaborationRequestKind,
103 #[serde(default, skip_serializing_if = "Vec::is_empty")]
105 pub requested_for_agent_ids: Vec<String>,
106 #[serde(default, skip_serializing_if = "Vec::is_empty")]
108 pub requested_for_roles: Vec<AgentRole>,
109 #[serde(default, skip_serializing_if = "Vec::is_empty")]
111 pub requested_for_actor_kinds: Vec<ApprovalActorKind>,
112 #[serde(default, skip_serializing_if = "Option::is_none")]
114 pub approval_scope: Option<ApprovalScope>,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub note: Option<String>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct TeamEscalationDraft {
123 pub level: CollaborationEscalationLevel,
125 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub escalated_by_agent_id: Option<String>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
130 pub target_role: Option<AgentRole>,
131 #[serde(default, skip_serializing_if = "Option::is_none")]
133 pub note: Option<String>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct TeamMessageDraft {
139 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub task_id: Option<String>,
142 pub kind: TeamMessageKind,
144 #[serde(default, skip_serializing_if = "Option::is_none")]
146 pub sender_agent_id: Option<String>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub recipient_agent_id: Option<String>,
150 pub content: String,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
154 pub thread_id: Option<String>,
155 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub reply_to_message_id: Option<String>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
160 pub action_request: Option<TeamActionRequestDraft>,
161 #[serde(default, skip_serializing_if = "Option::is_none")]
163 pub escalation: Option<TeamEscalationDraft>,
164 #[serde(default, skip_serializing_if = "Vec::is_empty")]
166 pub unread_by_agent_ids: Vec<String>,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct TeamActionRequest {
172 pub id: String,
174 pub kind: CollaborationRequestKind,
176 #[serde(default)]
178 pub status: CollaborationActionStatus,
179 pub requested_at: DateTime<Utc>,
181 #[serde(default, skip_serializing_if = "Option::is_none")]
183 pub requested_by_agent_id: Option<String>,
184 #[serde(default, skip_serializing_if = "Vec::is_empty")]
186 pub requested_for_agent_ids: Vec<String>,
187 #[serde(default, skip_serializing_if = "Vec::is_empty")]
189 pub requested_for_roles: Vec<AgentRole>,
190 #[serde(default, skip_serializing_if = "Vec::is_empty")]
192 pub requested_for_actor_kinds: Vec<ApprovalActorKind>,
193 #[serde(default, skip_serializing_if = "Option::is_none")]
195 pub approval_scope: Option<ApprovalScope>,
196 #[serde(default, skip_serializing_if = "Option::is_none")]
198 pub note: Option<String>,
199 #[serde(default, skip_serializing_if = "Option::is_none")]
201 pub resolved_at: Option<DateTime<Utc>>,
202 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub resolved_by_agent_id: Option<String>,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub resolution_note: Option<String>,
208}
209
210impl TeamActionRequest {
211 pub fn new(
213 kind: CollaborationRequestKind,
214 requested_by_agent_id: Option<String>,
215 note: Option<String>,
216 ) -> Self {
217 Self {
218 id: Uuid::new_v4().to_string(),
219 kind,
220 status: CollaborationActionStatus::Open,
221 requested_at: Utc::now(),
222 requested_by_agent_id,
223 requested_for_agent_ids: Vec::new(),
224 requested_for_roles: Vec::new(),
225 requested_for_actor_kinds: Vec::new(),
226 approval_scope: None,
227 note,
228 resolved_at: None,
229 resolved_by_agent_id: None,
230 resolution_note: None,
231 }
232 }
233
234 pub fn resolve(
236 &mut self,
237 status: CollaborationActionStatus,
238 resolved_by_agent_id: Option<String>,
239 resolution_note: Option<String>,
240 ) {
241 self.status = status;
242 if matches!(
243 status,
244 CollaborationActionStatus::Resolved
245 | CollaborationActionStatus::NeedsRevision
246 | CollaborationActionStatus::Cancelled
247 ) {
248 self.resolved_at = Some(Utc::now());
249 self.resolved_by_agent_id = resolved_by_agent_id;
250 self.resolution_note = resolution_note;
251 } else {
252 self.resolved_at = None;
253 self.resolved_by_agent_id = resolved_by_agent_id;
254 self.resolution_note = resolution_note;
255 }
256 }
257
258 pub fn requires_attention(&self) -> bool {
260 matches!(
261 self.status,
262 CollaborationActionStatus::Open | CollaborationActionStatus::Acknowledged
263 )
264 }
265}
266
267impl TeamActionRequestDraft {
268 pub fn into_request(self, requested_by_agent_id: Option<String>) -> TeamActionRequest {
270 let mut request = TeamActionRequest::new(self.kind, requested_by_agent_id, self.note);
271 request.requested_for_agent_ids = self.requested_for_agent_ids;
272 request.requested_for_roles = self.requested_for_roles;
273 request.requested_for_actor_kinds = self.requested_for_actor_kinds;
274 request.approval_scope = self.approval_scope;
275 request
276 }
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct TeamEscalation {
282 pub level: CollaborationEscalationLevel,
284 pub escalated_at: DateTime<Utc>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
288 pub escalated_by_agent_id: Option<String>,
289 #[serde(default, skip_serializing_if = "Option::is_none")]
291 pub target_role: Option<AgentRole>,
292 #[serde(default, skip_serializing_if = "Option::is_none")]
294 pub note: Option<String>,
295}
296
297impl TeamEscalationDraft {
298 pub fn into_escalation(self) -> TeamEscalation {
300 TeamEscalation {
301 level: self.level,
302 escalated_at: Utc::now(),
303 escalated_by_agent_id: self.escalated_by_agent_id,
304 target_role: self.target_role,
305 note: self.note,
306 }
307 }
308}
309
310#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
312pub struct TeamArtifactReference {
313 #[serde(default, skip_serializing_if = "Option::is_none")]
315 pub task_id: Option<String>,
316 pub name: String,
318 pub kind: String,
320 #[serde(default, skip_serializing_if = "Option::is_none")]
322 pub uri: Option<String>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
325 pub summary: Option<String>,
326}
327
328#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
330pub struct TeamResultReference {
331 pub task_id: String,
333 pub success: bool,
335 #[serde(default, skip_serializing_if = "Option::is_none")]
337 pub summary: Option<String>,
338 #[serde(default, skip_serializing_if = "Vec::is_empty")]
340 pub artifact_names: Vec<String>,
341 pub duration_ms: u64,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct TeamMessage {
348 pub id: String,
350 pub run_id: String,
352 #[serde(default, skip_serializing_if = "Option::is_none")]
354 pub task_id: Option<String>,
355 pub kind: TeamMessageKind,
357 #[serde(default, skip_serializing_if = "Option::is_none")]
359 pub sender_agent_id: Option<String>,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
362 pub recipient_agent_id: Option<String>,
363 pub content: String,
365 #[serde(default, skip_serializing_if = "Option::is_none")]
367 pub thread_id: Option<String>,
368 #[serde(default, skip_serializing_if = "Option::is_none")]
370 pub reply_to_message_id: Option<String>,
371 #[serde(default, skip_serializing_if = "Option::is_none")]
373 pub action_request: Option<TeamActionRequest>,
374 #[serde(default, skip_serializing_if = "Option::is_none")]
376 pub escalation: Option<TeamEscalation>,
377 #[serde(default, skip_serializing_if = "Option::is_none")]
379 pub result_reference: Option<TeamResultReference>,
380 #[serde(default, skip_serializing_if = "Vec::is_empty")]
382 pub artifact_references: Vec<TeamArtifactReference>,
383 #[serde(default, skip_serializing_if = "Vec::is_empty")]
385 pub unread_by_agent_ids: Vec<String>,
386 #[serde(default, skip_serializing_if = "Option::is_none")]
388 pub archived_at: Option<DateTime<Utc>>,
389 #[serde(default, skip_serializing_if = "Option::is_none")]
391 pub archived_by_agent_id: Option<String>,
392 #[serde(default, skip_serializing_if = "Option::is_none")]
394 pub archive_note: Option<String>,
395 pub created_at: DateTime<Utc>,
397}
398
399impl TeamMessage {
400 pub fn new(
402 run_id: impl Into<String>,
403 task_id: Option<String>,
404 kind: TeamMessageKind,
405 sender_agent_id: Option<String>,
406 recipient_agent_id: Option<String>,
407 content: impl Into<String>,
408 ) -> Self {
409 let id = Uuid::new_v4().to_string();
410 Self {
411 id: id.clone(),
412 run_id: run_id.into(),
413 task_id,
414 kind,
415 sender_agent_id,
416 recipient_agent_id,
417 content: content.into(),
418 thread_id: Some(id),
419 reply_to_message_id: None,
420 action_request: None,
421 escalation: None,
422 result_reference: None,
423 artifact_references: Vec::new(),
424 unread_by_agent_ids: Vec::new(),
425 archived_at: None,
426 archived_by_agent_id: None,
427 archive_note: None,
428 created_at: Utc::now(),
429 }
430 }
431
432 pub fn with_thread(mut self, thread_id: String, reply_to_message_id: Option<String>) -> Self {
434 self.thread_id = Some(thread_id);
435 self.reply_to_message_id = reply_to_message_id;
436 self
437 }
438
439 pub fn with_action_request(mut self, action_request: TeamActionRequest) -> Self {
441 self.action_request = Some(action_request);
442 self
443 }
444
445 pub fn with_result_reference(mut self, result_reference: TeamResultReference) -> Self {
447 self.result_reference = Some(result_reference);
448 self
449 }
450
451 pub fn with_artifact_references(
453 mut self,
454 artifact_references: Vec<TeamArtifactReference>,
455 ) -> Self {
456 self.artifact_references = artifact_references;
457 self
458 }
459
460 pub fn with_escalation(mut self, escalation: TeamEscalation) -> Self {
462 self.escalation = Some(escalation);
463 self
464 }
465
466 pub fn with_unread_by_agent_ids(mut self, unread_by_agent_ids: Vec<String>) -> Self {
468 self.unread_by_agent_ids = unread_by_agent_ids;
469 self
470 }
471
472 pub fn archive(&mut self, archived_by_agent_id: Option<String>, archive_note: Option<String>) {
474 self.archived_at = Some(Utc::now());
475 self.archived_by_agent_id = archived_by_agent_id;
476 self.archive_note = archive_note;
477 }
478
479 pub fn is_archived(&self) -> bool {
481 self.archived_at.is_some()
482 }
483
484 pub fn effective_thread_id(&self) -> &str {
486 self.thread_id.as_deref().unwrap_or(self.id.as_str())
487 }
488}
489
490#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct TeamThread {
493 pub id: String,
495 pub run_id: String,
497 #[serde(default, skip_serializing_if = "Option::is_none")]
499 pub task_id: Option<String>,
500 pub kind: TeamMessageKind,
502 pub status: CollaborationThreadStatus,
504 pub created_at: DateTime<Utc>,
506 pub updated_at: DateTime<Utc>,
508 pub archived: bool,
510 #[serde(default, skip_serializing_if = "Option::is_none")]
512 pub archived_at: Option<DateTime<Utc>>,
513 pub unread_count: usize,
515 pub message_count: usize,
517 pub actionable_message_count: usize,
519 pub requires_attention: bool,
521 #[serde(default, skip_serializing_if = "Vec::is_empty")]
523 pub participant_agent_ids: Vec<String>,
524 #[serde(default, skip_serializing_if = "Option::is_none")]
526 pub latest_action_request: Option<TeamActionRequest>,
527 #[serde(default, skip_serializing_if = "Option::is_none")]
529 pub latest_result_reference: Option<TeamResultReference>,
530 #[serde(default, skip_serializing_if = "Vec::is_empty")]
532 pub artifact_references: Vec<TeamArtifactReference>,
533 pub messages: Vec<TeamMessage>,
535}
536
537pub fn build_team_threads(messages: &[TeamMessage]) -> Vec<TeamThread> {
539 build_team_threads_with_options(messages, false)
540}
541
542pub fn build_team_threads_with_options(
544 messages: &[TeamMessage],
545 include_archived: bool,
546) -> Vec<TeamThread> {
547 let mut groups = std::collections::BTreeMap::<String, Vec<TeamMessage>>::new();
548 for message in messages {
549 groups
550 .entry(message.effective_thread_id().to_string())
551 .or_default()
552 .push(message.clone());
553 }
554
555 let mut threads = groups
556 .into_iter()
557 .filter_map(|(thread_id, mut thread_messages)| {
558 thread_messages.sort_by(|left, right| left.created_at.cmp(&right.created_at));
559 let first = thread_messages.first()?.clone();
560 let last = thread_messages.last()?.clone();
561 let latest_action_request = thread_messages
562 .iter()
563 .rev()
564 .find_map(|message| message.action_request.clone());
565 let latest_result_reference = thread_messages
566 .iter()
567 .rev()
568 .find_map(|message| message.result_reference.clone());
569 let mut artifact_references = Vec::new();
570 for message in &thread_messages {
571 for artifact in &message.artifact_references {
572 if !artifact_references.contains(artifact) {
573 artifact_references.push(artifact.clone());
574 }
575 }
576 }
577 let mut participant_agent_ids = Vec::new();
578 for message in &thread_messages {
579 for agent_id in [&message.sender_agent_id, &message.recipient_agent_id]
580 .into_iter()
581 .flatten()
582 {
583 if !participant_agent_ids.contains(agent_id) {
584 participant_agent_ids.push(agent_id.clone());
585 }
586 }
587 }
588 let archived = thread_messages.iter().all(TeamMessage::is_archived);
589 let archived_at = if archived {
590 thread_messages
591 .iter()
592 .filter_map(|message| message.archived_at)
593 .max()
594 } else {
595 None
596 };
597 let status = match latest_action_request.as_ref().map(|request| request.status) {
598 Some(CollaborationActionStatus::Open | CollaborationActionStatus::Acknowledged) => {
599 CollaborationThreadStatus::ActionRequired
600 }
601 Some(CollaborationActionStatus::NeedsRevision) => {
602 CollaborationThreadStatus::NeedsRevision
603 }
604 Some(
605 CollaborationActionStatus::Resolved | CollaborationActionStatus::Cancelled,
606 ) => CollaborationThreadStatus::Resolved,
607 None => CollaborationThreadStatus::Active,
608 };
609
610 if archived && !include_archived {
611 return None;
612 }
613
614 Some(TeamThread {
615 id: thread_id,
616 run_id: first.run_id.clone(),
617 task_id: last.task_id.clone().or(first.task_id.clone()),
618 kind: first.kind,
619 status,
620 created_at: first.created_at,
621 updated_at: last.created_at,
622 archived,
623 archived_at,
624 unread_count: thread_messages
625 .iter()
626 .map(|message| message.unread_by_agent_ids.len())
627 .sum(),
628 message_count: thread_messages.len(),
629 actionable_message_count: thread_messages
630 .iter()
631 .filter(|message| message.action_request.is_some())
632 .count(),
633 requires_attention: latest_action_request
634 .as_ref()
635 .is_some_and(TeamActionRequest::requires_attention),
636 participant_agent_ids,
637 latest_action_request,
638 latest_result_reference,
639 artifact_references,
640 messages: thread_messages,
641 })
642 })
643 .collect::<Vec<_>>();
644
645 threads.sort_by(|left, right| right.updated_at.cmp(&left.updated_at));
646 threads
647}
648
649impl TeamArtifactReference {
650 pub fn from_task_artifact(task_id: Option<String>, artifact: &TaskArtifactRecord) -> Self {
652 Self {
653 task_id,
654 name: artifact.name.clone(),
655 kind: artifact.kind.clone(),
656 uri: artifact.uri.clone(),
657 summary: artifact.summary.clone(),
658 }
659 }
660}
661
662impl TeamResultReference {
663 pub fn from_task_result(result: &TaskResult) -> Self {
665 Self {
666 task_id: result.task_id.clone(),
667 success: result.success,
668 summary: result.summary.clone(),
669 artifact_names: result
670 .artifacts
671 .iter()
672 .map(|artifact| artifact.name.clone())
673 .collect(),
674 duration_ms: result.duration_ms,
675 }
676 }
677}
678
679impl TeamMessageDraft {
680 pub fn into_message(self, run_id: impl Into<String>) -> TeamMessage {
682 let sender_agent_id = self.sender_agent_id.clone();
683 let mut message = TeamMessage::new(
684 run_id,
685 self.task_id,
686 self.kind,
687 self.sender_agent_id,
688 self.recipient_agent_id,
689 self.content,
690 );
691 if let Some(thread_id) = self.thread_id {
692 message = message.with_thread(thread_id, self.reply_to_message_id);
693 }
694 if let Some(action_request) = self.action_request {
695 message = message.with_action_request(action_request.into_request(sender_agent_id));
696 }
697 if let Some(escalation) = self.escalation {
698 message = message.with_escalation(escalation.into_escalation());
699 }
700 let mut unread_by_agent_ids = self.unread_by_agent_ids;
701 if let Some(request) = message.action_request.as_ref() {
702 for agent_id in &request.requested_for_agent_ids {
703 if !unread_by_agent_ids.contains(agent_id) {
704 unread_by_agent_ids.push(agent_id.clone());
705 }
706 }
707 }
708 if !unread_by_agent_ids.is_empty() {
709 message = message.with_unread_by_agent_ids(unread_by_agent_ids);
710 }
711 message
712 }
713}
714
715pub fn archive_resolved_threads(
717 messages: &mut [TeamMessage],
718 archived_by_agent_id: Option<String>,
719 retention_days: i64,
720) -> usize {
721 let cutoff = Utc::now() - Duration::days(retention_days.max(0));
722 let thread_ids = build_team_threads_with_options(messages, true)
723 .into_iter()
724 .filter(|thread| {
725 !thread.archived
726 && matches!(thread.status, CollaborationThreadStatus::Resolved)
727 && thread.updated_at <= cutoff
728 })
729 .map(|thread| thread.id)
730 .collect::<Vec<_>>();
731
732 let mut archived_count = 0;
733 for message in messages.iter_mut() {
734 if thread_ids
735 .iter()
736 .any(|thread_id| thread_id == message.effective_thread_id())
737 {
738 message.archive(
739 archived_by_agent_id.clone(),
740 Some("Auto-archived after retention period".to_string()),
741 );
742 archived_count += 1;
743 }
744 }
745 archived_count
746}
747
748#[cfg(test)]
749mod tests {
750 use super::*;
751
752 #[test]
753 fn legacy_team_message_deserializes_with_defaults() {
754 let legacy = serde_json::json!({
755 "id": "msg-1",
756 "run_id": "run-1",
757 "kind": "status_update",
758 "content": "Legacy message",
759 "created_at": "2026-03-10T00:00:00Z"
760 });
761
762 let message: TeamMessage = serde_json::from_value(legacy).unwrap();
763
764 assert_eq!(message.id, "msg-1");
765 assert_eq!(message.thread_id, None);
766 assert!(message.action_request.is_none());
767 assert!(message.artifact_references.is_empty());
768 assert!(message.unread_by_agent_ids.is_empty());
769 }
770
771 #[test]
772 fn build_team_threads_groups_replies_and_tracks_resolution() {
773 let mut request = TeamActionRequest::new(
774 CollaborationRequestKind::ReviewRequest,
775 Some("orchestrator".to_string()),
776 Some("Need reviewer sign-off".to_string()),
777 );
778 request.requested_for_actor_kinds = vec![ApprovalActorKind::Reviewer];
779 let root = TeamMessage::new(
780 "run-1",
781 Some("task-1".to_string()),
782 TeamMessageKind::ReviewRequest,
783 Some("orchestrator".to_string()),
784 None,
785 "Review requested",
786 )
787 .with_action_request(request);
788 let reply = TeamMessage::new(
789 "run-1",
790 Some("task-1".to_string()),
791 TeamMessageKind::ApprovalDecision,
792 Some("reviewer-1".to_string()),
793 Some("agent-1".to_string()),
794 "Approved",
795 )
796 .with_thread(
797 root.effective_thread_id().to_string(),
798 Some(root.id.clone()),
799 );
800 let mut resolved_root = root.clone();
801 resolved_root.action_request.as_mut().unwrap().resolve(
802 CollaborationActionStatus::Resolved,
803 Some("reviewer-1".to_string()),
804 Some("Looks good".to_string()),
805 );
806
807 let threads = build_team_threads(&[resolved_root.clone(), reply.clone()]);
808
809 assert_eq!(threads.len(), 1);
810 assert_eq!(threads[0].id, resolved_root.effective_thread_id());
811 assert_eq!(threads[0].messages.len(), 2);
812 assert_eq!(threads[0].status, CollaborationThreadStatus::Resolved);
813 assert!(!threads[0].requires_attention);
814 }
815}