gestura_core_a2a/
server.rs

1//! A2A server + authentication helpers.
2//!
3//! This module contains the in-memory server-side implementation of the A2A JSON-RPC
4//! protocol along with basic agent identity/authentication primitives.
5//!
6//! ## Design
7//! - Transport-agnostic: HTTP/SSE adapters live in shells (GUI/CLI), but the protocol
8//!   routing and request/response handling lives in `gestura-core`.
9//! - Lightweight auth: bearer token validation is performed against an in-memory
10//!   `ProfileStore` (suitable for local adapters and tests).
11
12use super::{
13    A2AError, A2AMessage, A2ARequest, A2AResponse, A2ATask, A2ATaskEvent, A2ATaskEventKind,
14    AgentCard, Artifact, ArtifactManifestEntry, CreateTaskRequest, RemoteTaskLease,
15    RemoteTaskLeaseRequest, TaskArtifactFetchRequest, TaskAuditEvent, TaskHeartbeatRequest,
16    TaskProvenance, TaskStatus,
17};
18use chrono::{DateTime, Duration, Utc};
19use rand::Rng;
20use rand::distributions::Alphanumeric;
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24#[derive(Debug, Clone, Deserialize)]
25#[serde(rename_all = "camelCase")]
26struct LegacyCreateTaskRequest {
27    #[serde(flatten)]
28    message: A2AMessage,
29    #[serde(default)]
30    run_id: Option<String>,
31    #[serde(default)]
32    parent_task_id: Option<String>,
33    #[serde(default, rename = "taskRole")]
34    task_role: Option<String>,
35    #[serde(default)]
36    requested_capabilities: Vec<String>,
37    #[serde(default)]
38    contract: Option<super::RemoteTaskContract>,
39    #[serde(default)]
40    idempotency_key: Option<String>,
41    #[serde(default)]
42    lease_request: Option<RemoteTaskLeaseRequest>,
43    #[serde(default)]
44    metadata: HashMap<String, serde_json::Value>,
45}
46
47/// Agent profile for authentication and identity propagation.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct AgentProfile {
51    /// Unique agent identifier.
52    pub agent_id: String,
53    /// Human-readable agent name.
54    pub name: String,
55    /// Agent version.
56    pub version: String,
57    /// Capabilities this agent advertises.
58    pub capabilities: Vec<String>,
59    /// Authentication token for this agent.
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub auth_token: Option<String>,
62    /// Token expiration time.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub token_expires_at: Option<DateTime<Utc>>,
65    /// Metadata for custom properties.
66    #[serde(default)]
67    pub metadata: HashMap<String, serde_json::Value>,
68}
69
70impl AgentProfile {
71    /// Create a new agent profile.
72    pub fn new(agent_id: impl Into<String>, name: impl Into<String>) -> Self {
73        Self {
74            agent_id: agent_id.into(),
75            name: name.into(),
76            version: env!("CARGO_PKG_VERSION").to_string(),
77            capabilities: vec![],
78            auth_token: None,
79            token_expires_at: None,
80            metadata: HashMap::new(),
81        }
82    }
83
84    /// Add a capability string.
85    pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
86        self.capabilities.push(capability.into());
87        self
88    }
89
90    /// Set the authentication token (and optional expiry) for this profile.
91    pub fn with_auth_token(
92        mut self,
93        token: impl Into<String>,
94        expires_at: Option<DateTime<Utc>>,
95    ) -> Self {
96        self.auth_token = Some(token.into());
97        self.token_expires_at = expires_at;
98        self
99    }
100
101    /// Return whether the currently attached token exists and is not expired.
102    pub fn is_token_valid(&self) -> bool {
103        match (&self.auth_token, &self.token_expires_at) {
104            (Some(_), Some(expires)) => Utc::now() < *expires,
105            (Some(_), None) => true,
106            (None, _) => false,
107        }
108    }
109
110    /// Generate a new bearer token for this agent and set `token_expires_at`.
111    ///
112    /// The generated token is URL-safe alphanumeric and intended for use as a
113    /// `Authorization: Bearer <token>` credential.
114    pub fn generate_token(&mut self, validity_hours: i64) {
115        let token: String = rand::thread_rng()
116            .sample_iter(&Alphanumeric)
117            .take(64)
118            .map(char::from)
119            .collect();
120        self.auth_token = Some(token);
121        self.token_expires_at = Some(Utc::now() + Duration::hours(validity_hours));
122    }
123}
124
125/// Return whether a token is well-formed for Gestura's current bearer-token scheme.
126///
127/// This is **not** a full validation step (it does not check expiry or revocation),
128/// but it can be used by shells to provide a quick, offline sanity check.
129pub fn is_token_well_formed(token: &str) -> bool {
130    token.len() >= 32 && token.chars().all(|c| c.is_ascii_alphanumeric())
131}
132
133/// Profile store for managing agent profiles.
134#[derive(Debug, Default)]
135pub struct ProfileStore {
136    profiles: std::sync::RwLock<HashMap<String, AgentProfile>>,
137}
138
139impl ProfileStore {
140    /// Create a new in-memory profile store.
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Store/overwrite a profile keyed by `agent_id`.
146    pub fn store(&self, profile: AgentProfile) {
147        let mut profiles = self.profiles.write().unwrap();
148        profiles.insert(profile.agent_id.clone(), profile);
149    }
150
151    /// Retrieve a profile by `agent_id`.
152    pub fn get(&self, agent_id: &str) -> Option<AgentProfile> {
153        let profiles = self.profiles.read().unwrap();
154        profiles.get(agent_id).cloned()
155    }
156
157    /// Validate a bearer token and return the associated profile (if present and valid).
158    pub fn validate_token(&self, token: &str) -> Option<AgentProfile> {
159        let profiles = self.profiles.read().unwrap();
160        profiles
161            .values()
162            .find(|p| p.auth_token.as_deref() == Some(token) && p.is_token_valid())
163            .cloned()
164    }
165
166    /// List all stored profiles.
167    pub fn list(&self) -> Vec<AgentProfile> {
168        let profiles = self.profiles.read().unwrap();
169        profiles.values().cloned().collect()
170    }
171
172    /// Remove a profile by `agent_id`.
173    pub fn remove(&self, agent_id: &str) -> Option<AgentProfile> {
174        let mut profiles = self.profiles.write().unwrap();
175        profiles.remove(agent_id)
176    }
177}
178
179/// In-memory registry for discovered/known agent cards.
180#[derive(Debug, Default)]
181pub struct AgentCardRegistry {
182    cards: std::sync::RwLock<HashMap<String, AgentCard>>,
183}
184
185impl AgentCardRegistry {
186    /// Create a new registry.
187    pub fn new() -> Self {
188        Self::default()
189    }
190
191    /// Register an agent card.
192    pub fn register(&self, card: AgentCard) {
193        let mut cards = self.cards.write().unwrap();
194        cards.insert(card.name.clone(), card);
195    }
196
197    /// Get an agent card by name.
198    pub fn get(&self, name: &str) -> Option<AgentCard> {
199        let cards = self.cards.read().unwrap();
200        cards.get(name).cloned()
201    }
202
203    /// List all registered agents.
204    pub fn list(&self) -> Vec<AgentCard> {
205        let cards = self.cards.read().unwrap();
206        cards.values().cloned().collect()
207    }
208
209    /// Remove an agent card.
210    pub fn remove(&self, name: &str) -> Option<AgentCard> {
211        let mut cards = self.cards.write().unwrap();
212        cards.remove(name)
213    }
214}
215
216/// A2A server for handling incoming JSON-RPC requests.
217///
218/// This is a **protocol** server (request router + in-memory task store), not a
219/// network server. A shell crate is responsible for exposing it over HTTP/SSE.
220pub struct A2AServer {
221    /// This agent's card.
222    pub agent_card: AgentCard,
223    /// Registry of known agents.
224    pub registry: AgentCardRegistry,
225    /// Profile store for bearer token authentication.
226    pub profile_store: ProfileStore,
227    /// Active tasks.
228    tasks: std::sync::RwLock<HashMap<String, A2ATask>>,
229    /// Scoped idempotency-key to task-id mapping.
230    idempotency_index: std::sync::RwLock<HashMap<String, String>>,
231    /// Task-to-caller-profile mapping.
232    task_profiles: std::sync::RwLock<HashMap<String, AgentProfile>>,
233    /// Broadcast stream for task lifecycle changes.
234    event_subscribers: std::sync::Mutex<Vec<std::sync::mpsc::Sender<A2ATaskEvent>>>,
235}
236
237impl A2AServer {
238    /// Create a new A2A server for the given agent card.
239    pub fn new(agent_card: AgentCard) -> Self {
240        Self {
241            agent_card,
242            registry: AgentCardRegistry::new(),
243            profile_store: ProfileStore::new(),
244            tasks: std::sync::RwLock::new(HashMap::new()),
245            idempotency_index: std::sync::RwLock::new(HashMap::new()),
246            task_profiles: std::sync::RwLock::new(HashMap::new()),
247            event_subscribers: std::sync::Mutex::new(Vec::new()),
248        }
249    }
250
251    /// Handle an incoming JSON-RPC request.
252    pub fn handle_request(&self, request: A2ARequest) -> A2AResponse {
253        self.handle_request_with_auth(request, None)
254    }
255
256    /// Handle an incoming JSON-RPC request with an optional bearer token.
257    pub fn handle_request_with_auth(
258        &self,
259        request: A2ARequest,
260        auth_token: Option<&str>,
261    ) -> A2AResponse {
262        // Validate token if provided.
263        let caller_profile = auth_token.and_then(|token| self.profile_store.validate_token(token));
264
265        // Auth is currently required for task creation/cancellation if the agent
266        // card advertises an auth scheme.
267        let requires_auth = matches!(
268            request.method.as_str(),
269            "task/create"
270                | "task/status"
271                | "task/cancel"
272                | "task/retry"
273                | "task/heartbeat"
274                | "task/artifacts"
275                | "task/artifact"
276        );
277        if requires_auth && self.agent_card.authentication.is_some() && caller_profile.is_none() {
278            return A2AResponse {
279                jsonrpc: "2.0".to_string(),
280                result: None,
281                error: Some(A2AError {
282                    code: -32000,
283                    message: "Authentication required".to_string(),
284                    data: None,
285                }),
286                id: request.id,
287            };
288        }
289
290        let result = match request.method.as_str() {
291            "agent/discover" => self.handle_discover(),
292            "task/create" => self.handle_task_create(&request.params, caller_profile.as_ref()),
293            "task/status" => self.handle_task_status(&request.params, caller_profile.as_ref()),
294            "task/cancel" => self.handle_task_cancel(&request.params, caller_profile.as_ref()),
295            "task/retry" => self.handle_task_retry(&request.params, caller_profile.as_ref()),
296            "task/heartbeat" => {
297                self.handle_task_heartbeat(&request.params, caller_profile.as_ref())
298            }
299            "task/artifacts" => {
300                self.handle_task_artifacts(&request.params, caller_profile.as_ref())
301            }
302            "task/artifact" => self.handle_task_artifact(&request.params, caller_profile.as_ref()),
303            "profile/register" => self.handle_profile_register(&request.params),
304            "profile/validate" => self.handle_profile_validate(&request.params),
305            _ => Err(A2AError {
306                code: -32601,
307                message: format!("Method not found: {}", request.method),
308                data: None,
309            }),
310        };
311
312        match result {
313            Ok(value) => A2AResponse {
314                jsonrpc: "2.0".to_string(),
315                result: Some(value),
316                error: None,
317                id: request.id,
318            },
319            Err(error) => A2AResponse {
320                jsonrpc: "2.0".to_string(),
321                result: None,
322                error: Some(error),
323                id: request.id,
324            },
325        }
326    }
327
328    /// Get the caller profile for a task (if known).
329    pub fn get_task_caller(&self, task_id: &str) -> Option<AgentProfile> {
330        let profiles = self.task_profiles.read().unwrap();
331        profiles.get(task_id).cloned()
332    }
333
334    /// Subscribe to task lifecycle events for streaming adapters.
335    pub fn subscribe_events(&self) -> std::sync::mpsc::Receiver<A2ATaskEvent> {
336        let (sender, receiver) = std::sync::mpsc::channel();
337        self.event_subscribers.lock().unwrap().push(sender);
338        receiver
339    }
340
341    /// List currently known tasks.
342    pub fn list_tasks(&self) -> Vec<A2ATask> {
343        self.tasks.read().unwrap().values().cloned().collect()
344    }
345
346    /// Get a specific task snapshot.
347    pub fn get_task(&self, task_id: &str) -> Option<A2ATask> {
348        self.tasks.read().unwrap().get(task_id).cloned()
349    }
350
351    fn emit_task_event(&self, kind: A2ATaskEventKind, task: &A2ATask) {
352        let event = A2ATaskEvent {
353            kind,
354            task: task.clone(),
355            emitted_at: Utc::now(),
356        };
357        let mut subscribers = self.event_subscribers.lock().unwrap();
358        subscribers.retain(|subscriber| subscriber.send(event.clone()).is_ok());
359    }
360
361    fn scoped_idempotency_key(caller: Option<&AgentProfile>, idempotency_key: &str) -> String {
362        let caller_scope = caller
363            .map(|profile| profile.agent_id.as_str())
364            .unwrap_or("anonymous");
365        format!("{caller_scope}:{idempotency_key}")
366    }
367
368    fn lease_from_request(
369        &self,
370        request: &RemoteTaskLeaseRequest,
371        acquired_at: DateTime<Utc>,
372    ) -> RemoteTaskLease {
373        let ttl_secs = request.ttl_secs.min(i64::MAX as u64) as i64;
374        RemoteTaskLease {
375            lease_id: uuid::Uuid::new_v4().to_string(),
376            holder_agent_id: Some(self.agent_card.name.clone()),
377            acquired_at,
378            last_heartbeat_at: acquired_at,
379            expires_at: acquired_at + Duration::seconds(ttl_secs),
380            heartbeat_interval_secs: request.heartbeat_interval_secs,
381        }
382    }
383
384    fn reconcile_task_lease(task: &mut A2ATask) {
385        let Some(lease) = task.lease.as_ref() else {
386            return;
387        };
388        if matches!(
389            task.status,
390            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
391        ) {
392            return;
393        }
394        if Utc::now() >= lease.expires_at && !matches!(task.status, TaskStatus::Blocked) {
395            let expired_at = Utc::now();
396            task.status = TaskStatus::Blocked;
397            task.status_reason = Some("Remote lease heartbeat expired".to_string());
398            task.updated_at = expired_at;
399            task.audit_log.push(TaskAuditEvent {
400                at: expired_at,
401                event: "lease_expired".to_string(),
402                detail: Some(
403                    "Remote worker heartbeat was not renewed before lease expiry".to_string(),
404                ),
405            });
406        }
407    }
408
409    fn ensure_task_owner(
410        &self,
411        task_id: &str,
412        caller: Option<&AgentProfile>,
413    ) -> Result<(), A2AError> {
414        if self.agent_card.authentication.is_none() {
415            return Ok(());
416        }
417        let owner = self.get_task_caller(task_id).ok_or_else(|| A2AError {
418            code: -32003,
419            message: format!("Task {task_id} has no registered owner"),
420            data: None,
421        })?;
422        let caller = caller.ok_or_else(|| A2AError {
423            code: -32000,
424            message: "Authentication required".to_string(),
425            data: None,
426        })?;
427        if caller.agent_id != owner.agent_id {
428            return Err(A2AError {
429                code: -32004,
430                message: format!(
431                    "Caller {} is not authorized to mutate task owned by {}",
432                    caller.agent_id, owner.agent_id
433                ),
434                data: None,
435            });
436        }
437        Ok(())
438    }
439
440    fn caller_auth_scheme(&self, caller: Option<&AgentProfile>) -> Option<String> {
441        caller.and_then(|_| {
442            self.agent_card
443                .authentication
444                .as_ref()
445                .and_then(|authentication| authentication.schemes.first().cloned())
446        })
447    }
448
449    fn handle_discover(&self) -> Result<serde_json::Value, A2AError> {
450        serde_json::to_value(&self.agent_card).map_err(|e| A2AError {
451            code: -32603,
452            message: format!("Serialization error: {e}"),
453            data: None,
454        })
455    }
456
457    fn handle_task_create(
458        &self,
459        params: &serde_json::Value,
460        caller: Option<&AgentProfile>,
461    ) -> Result<serde_json::Value, A2AError> {
462        let request = serde_json::from_value::<CreateTaskRequest>(params.clone())
463            .or_else(|_| {
464                serde_json::from_value::<LegacyCreateTaskRequest>(params.clone()).map(|legacy| {
465                    CreateTaskRequest {
466                        message: legacy.message,
467                        run_id: legacy.run_id,
468                        parent_task_id: legacy.parent_task_id,
469                        role: legacy.task_role,
470                        requested_capabilities: legacy.requested_capabilities,
471                        contract: legacy.contract,
472                        idempotency_key: legacy.idempotency_key,
473                        lease_request: legacy.lease_request,
474                        metadata: legacy.metadata,
475                    }
476                })
477            })
478            .map_err(|e| A2AError {
479                code: -32602,
480                message: format!("Invalid params: {e}"),
481                data: None,
482            })?;
483
484        if let Some(idempotency_key) = request.idempotency_key.as_deref() {
485            let scoped_key = Self::scoped_idempotency_key(caller, idempotency_key);
486            if let Some(existing_task_id) = self
487                .idempotency_index
488                .read()
489                .unwrap()
490                .get(&scoped_key)
491                .cloned()
492            {
493                let mut tasks = self.tasks.write().unwrap();
494                if let Some(existing_task) = tasks.get_mut(&existing_task_id) {
495                    existing_task.updated_at = Utc::now();
496                    existing_task.audit_log.push(TaskAuditEvent {
497                        at: existing_task.updated_at,
498                        event: "idempotent_replay".to_string(),
499                        detail: Some(format!(
500                            "Duplicate task/create request reused idempotency key {}",
501                            idempotency_key
502                        )),
503                    });
504                    return serde_json::to_value(existing_task).map_err(|e| A2AError {
505                        code: -32603,
506                        message: format!("Serialization error: {e}"),
507                        data: None,
508                    });
509                }
510            }
511        }
512
513        let task_id = uuid::Uuid::new_v4().to_string();
514        let mut metadata = request.metadata;
515        let now = Utc::now();
516
517        let provenance = if let Some(profile) = caller {
518            metadata.insert(
519                "caller_agent_id".to_string(),
520                serde_json::json!(profile.agent_id),
521            );
522            metadata.insert("caller_name".to_string(), serde_json::json!(profile.name));
523            metadata.insert(
524                "caller_version".to_string(),
525                serde_json::json!(profile.version),
526            );
527            metadata.insert(
528                "caller_capabilities".to_string(),
529                serde_json::json!(profile.capabilities),
530            );
531            metadata.insert("caller_authenticated".to_string(), serde_json::json!(true));
532            if let Some(auth_scheme) = self.caller_auth_scheme(caller) {
533                metadata.insert(
534                    "caller_auth_scheme".to_string(),
535                    serde_json::json!(auth_scheme),
536                );
537            }
538
539            let mut profiles = self.task_profiles.write().unwrap();
540            profiles.insert(task_id.clone(), profile.clone());
541
542            Some(TaskProvenance {
543                caller_agent_id: Some(profile.agent_id.clone()),
544                caller_name: Some(profile.name.clone()),
545                caller_version: Some(profile.version.clone()),
546                caller_capabilities: profile.capabilities.clone(),
547                authenticated: true,
548                auth_scheme: self.caller_auth_scheme(caller),
549            })
550        } else {
551            None
552        };
553
554        let task = A2ATask {
555            id: task_id.clone(),
556            status: TaskStatus::Pending,
557            status_reason: Some("Accepted by remote agent".to_string()),
558            messages: vec![request.message],
559            artifacts: vec![],
560            retry_count: 0,
561            run_id: request.run_id,
562            parent_task_id: request.parent_task_id,
563            role: request.role,
564            requested_capabilities: request.requested_capabilities,
565            contract: request.contract,
566            idempotency_key: request.idempotency_key.clone(),
567            lease: request
568                .lease_request
569                .as_ref()
570                .map(|lease_request| self.lease_from_request(lease_request, now)),
571            progress: None,
572            provenance,
573            audit_log: vec![TaskAuditEvent {
574                at: now,
575                event: "created".to_string(),
576                detail: Some("Remote task accepted".to_string()),
577            }],
578            created_at: now,
579            updated_at: now,
580            metadata,
581        };
582
583        {
584            let mut tasks = self.tasks.write().unwrap();
585            tasks.insert(task_id.clone(), task.clone());
586        }
587        if let Some(idempotency_key) = request.idempotency_key {
588            let scoped_key = Self::scoped_idempotency_key(caller, &idempotency_key);
589            self.idempotency_index
590                .write()
591                .unwrap()
592                .insert(scoped_key, task_id.clone());
593        }
594        self.emit_task_event(A2ATaskEventKind::Created, &task);
595
596        tracing::info!(task_id = %task_id, caller = ?caller.map(|p| &p.agent_id), "A2A task created");
597        serde_json::to_value(&task).map_err(|e| A2AError {
598            code: -32603,
599            message: format!("Serialization error: {e}"),
600            data: None,
601        })
602    }
603
604    fn handle_profile_register(
605        &self,
606        params: &serde_json::Value,
607    ) -> Result<serde_json::Value, A2AError> {
608        let profile: AgentProfile =
609            serde_json::from_value(params.clone()).map_err(|e| A2AError {
610                code: -32602,
611                message: format!("Invalid profile: {e}"),
612                data: None,
613            })?;
614
615        let agent_id = profile.agent_id.clone();
616        self.profile_store.store(profile);
617
618        tracing::info!(agent_id = %agent_id, "A2A profile registered");
619        Ok(serde_json::json!({"success": true, "agentId": agent_id}))
620    }
621
622    fn handle_profile_validate(
623        &self,
624        params: &serde_json::Value,
625    ) -> Result<serde_json::Value, A2AError> {
626        let token = params
627            .get("token")
628            .and_then(|v| v.as_str())
629            .ok_or_else(|| A2AError {
630                code: -32602,
631                message: "Missing token parameter".to_string(),
632                data: None,
633            })?;
634
635        match self.profile_store.validate_token(token) {
636            Some(profile) => Ok(serde_json::json!({
637                "valid": true,
638                "agentId": profile.agent_id,
639                "name": profile.name,
640                "capabilities": profile.capabilities,
641            })),
642            None => Ok(serde_json::json!({"valid": false})),
643        }
644    }
645
646    fn handle_task_status(
647        &self,
648        params: &serde_json::Value,
649        caller: Option<&AgentProfile>,
650    ) -> Result<serde_json::Value, A2AError> {
651        let task_id = params
652            .get("taskId")
653            .and_then(|v| v.as_str())
654            .ok_or_else(|| A2AError {
655                code: -32602,
656                message: "Missing taskId parameter".to_string(),
657                data: None,
658            })?;
659
660        self.ensure_task_owner(task_id, caller)?;
661        let mut tasks = self.tasks.write().unwrap();
662        let task = tasks.get_mut(task_id).ok_or_else(|| A2AError {
663            code: -32001,
664            message: format!("Task not found: {task_id}"),
665            data: None,
666        })?;
667        let previous_status = task.status;
668        Self::reconcile_task_lease(task);
669        if task.status != previous_status {
670            self.emit_task_event(A2ATaskEventKind::Updated, task);
671        }
672
673        serde_json::to_value(task).map_err(|e| A2AError {
674            code: -32603,
675            message: format!("Serialization error: {e}"),
676            data: None,
677        })
678    }
679
680    fn handle_task_artifacts(
681        &self,
682        params: &serde_json::Value,
683        caller: Option<&AgentProfile>,
684    ) -> Result<serde_json::Value, A2AError> {
685        let task_id = params
686            .get("taskId")
687            .and_then(|v| v.as_str())
688            .ok_or_else(|| A2AError {
689                code: -32602,
690                message: "Missing taskId parameter".to_string(),
691                data: None,
692            })?;
693        self.ensure_task_owner(task_id, caller)?;
694        let tasks = self.tasks.read().unwrap();
695        let task = tasks.get(task_id).ok_or_else(|| A2AError {
696            code: -32001,
697            message: format!("Task not found: {task_id}"),
698            data: None,
699        })?;
700        let manifest = task
701            .artifacts
702            .iter()
703            .map(|artifact| ArtifactManifestEntry {
704                name: artifact.name.clone(),
705                part_count: artifact.parts.len(),
706                metadata: artifact.metadata.clone(),
707            })
708            .collect::<Vec<_>>();
709        serde_json::to_value(manifest).map_err(|e| A2AError {
710            code: -32603,
711            message: format!("Serialization error: {e}"),
712            data: None,
713        })
714    }
715
716    fn handle_task_artifact(
717        &self,
718        params: &serde_json::Value,
719        caller: Option<&AgentProfile>,
720    ) -> Result<serde_json::Value, A2AError> {
721        let request =
722            serde_json::from_value::<TaskArtifactFetchRequest>(params.clone()).map_err(|e| {
723                A2AError {
724                    code: -32602,
725                    message: format!("Invalid artifact fetch request: {e}"),
726                    data: None,
727                }
728            })?;
729        self.ensure_task_owner(&request.task_id, caller)?;
730        let tasks = self.tasks.read().unwrap();
731        let task = tasks.get(&request.task_id).ok_or_else(|| A2AError {
732            code: -32001,
733            message: format!("Task not found: {}", request.task_id),
734            data: None,
735        })?;
736        let artifact = task
737            .artifacts
738            .iter()
739            .find(|artifact| artifact.name == request.artifact_name)
740            .cloned()
741            .ok_or_else(|| A2AError {
742                code: -32005,
743                message: format!(
744                    "Artifact '{}' not found for task {}",
745                    request.artifact_name, request.task_id
746                ),
747                data: None,
748            })?;
749        serde_json::to_value(artifact).map_err(|e| A2AError {
750            code: -32603,
751            message: format!("Serialization error: {e}"),
752            data: None,
753        })
754    }
755
756    fn handle_task_retry(
757        &self,
758        params: &serde_json::Value,
759        caller: Option<&AgentProfile>,
760    ) -> Result<serde_json::Value, A2AError> {
761        let task_id = params
762            .get("taskId")
763            .and_then(|v| v.as_str())
764            .ok_or_else(|| A2AError {
765                code: -32602,
766                message: "Missing taskId parameter".to_string(),
767                data: None,
768            })?;
769
770        self.ensure_task_owner(task_id, caller)?;
771        let mut tasks = self.tasks.write().unwrap();
772        let task = tasks.get_mut(task_id).ok_or_else(|| A2AError {
773            code: -32001,
774            message: format!("Task not found: {task_id}"),
775            data: None,
776        })?;
777
778        task.status = TaskStatus::Pending;
779        task.status_reason = Some("Retry requested".to_string());
780        task.retry_count += 1;
781        task.updated_at = Utc::now();
782        task.audit_log.push(TaskAuditEvent {
783            at: task.updated_at,
784            event: "retried".to_string(),
785            detail: Some(format!("Retry count is now {}", task.retry_count)),
786        });
787        self.emit_task_event(A2ATaskEventKind::Updated, task);
788
789        serde_json::to_value(task).map_err(|e| A2AError {
790            code: -32603,
791            message: format!("Serialization error: {e}"),
792            data: None,
793        })
794    }
795
796    fn handle_task_heartbeat(
797        &self,
798        params: &serde_json::Value,
799        caller: Option<&AgentProfile>,
800    ) -> Result<serde_json::Value, A2AError> {
801        let heartbeat =
802            serde_json::from_value::<TaskHeartbeatRequest>(params.clone()).map_err(|e| {
803                A2AError {
804                    code: -32602,
805                    message: format!("Invalid heartbeat request: {e}"),
806                    data: None,
807                }
808            })?;
809
810        self.ensure_task_owner(&heartbeat.task_id, caller)?;
811        let mut tasks = self.tasks.write().unwrap();
812        let task = tasks.get_mut(&heartbeat.task_id).ok_or_else(|| A2AError {
813            code: -32001,
814            message: format!("Task not found: {}", heartbeat.task_id),
815            data: None,
816        })?;
817
818        let now = Utc::now();
819        let Some(lease) = task.lease.as_mut() else {
820            return Err(A2AError {
821                code: -32002,
822                message: format!("Task {} does not have an active lease", heartbeat.task_id),
823                data: None,
824            });
825        };
826
827        lease.last_heartbeat_at = now;
828        let extend_secs = heartbeat
829            .extend_lease_secs
830            .unwrap_or(lease.heartbeat_interval_secs)
831            .min(i64::MAX as u64) as i64;
832        lease.expires_at = now + Duration::seconds(extend_secs);
833
834        if let Some(status) = heartbeat.status {
835            task.status = status;
836        }
837        if heartbeat.status_reason.is_some() {
838            task.status_reason = heartbeat.status_reason;
839        }
840        if heartbeat.progress.is_some() {
841            task.progress = heartbeat.progress;
842        }
843        if !heartbeat.artifacts.is_empty() {
844            task.artifacts.extend(heartbeat.artifacts);
845        }
846        task.updated_at = now;
847        task.audit_log.push(TaskAuditEvent {
848            at: now,
849            event: "heartbeat".to_string(),
850            detail: Some(format!(
851                "Lease renewed until {}",
852                lease.expires_at.to_rfc3339()
853            )),
854        });
855        self.emit_task_event(A2ATaskEventKind::Updated, task);
856
857        serde_json::to_value(task).map_err(|e| A2AError {
858            code: -32603,
859            message: format!("Serialization error: {e}"),
860            data: None,
861        })
862    }
863
864    fn handle_task_cancel(
865        &self,
866        params: &serde_json::Value,
867        caller: Option<&AgentProfile>,
868    ) -> Result<serde_json::Value, A2AError> {
869        let task_id = params
870            .get("taskId")
871            .and_then(|v| v.as_str())
872            .ok_or_else(|| A2AError {
873                code: -32602,
874                message: "Missing taskId parameter".to_string(),
875                data: None,
876            })?;
877
878        self.ensure_task_owner(task_id, caller)?;
879        let mut tasks = self.tasks.write().unwrap();
880        let task = tasks.get_mut(task_id).ok_or_else(|| A2AError {
881            code: -32001,
882            message: format!("Task not found: {task_id}"),
883            data: None,
884        })?;
885
886        task.status = TaskStatus::Cancelled;
887        task.status_reason = Some("Cancelled by caller".to_string());
888        task.updated_at = Utc::now();
889        task.audit_log.push(TaskAuditEvent {
890            at: task.updated_at,
891            event: "cancelled".to_string(),
892            detail: Some("Remote caller cancelled task".to_string()),
893        });
894        self.emit_task_event(A2ATaskEventKind::Cancelled, task);
895        tracing::info!(task_id = %task_id, "A2A task cancelled");
896
897        serde_json::to_value(task).map_err(|e| A2AError {
898            code: -32603,
899            message: format!("Serialization error: {e}"),
900            data: None,
901        })
902    }
903
904    /// Update task status (for internal use).
905    pub fn update_task_status(&self, task_id: &str, status: TaskStatus) -> Result<(), String> {
906        let mut tasks = self.tasks.write().unwrap();
907        let task = tasks
908            .get_mut(task_id)
909            .ok_or_else(|| format!("Task not found: {task_id}"))?;
910        task.status = status;
911        task.updated_at = Utc::now();
912        task.status_reason = Some(format!("Updated to {:?}", task.status));
913        task.audit_log.push(TaskAuditEvent {
914            at: task.updated_at,
915            event: "status_updated".to_string(),
916            detail: Some(format!("Task status changed to {:?}", task.status)),
917        });
918        let kind = match task.status {
919            TaskStatus::Completed => A2ATaskEventKind::Completed,
920            TaskStatus::Failed => A2ATaskEventKind::Failed,
921            TaskStatus::Cancelled => A2ATaskEventKind::Cancelled,
922            _ => A2ATaskEventKind::Updated,
923        };
924        self.emit_task_event(kind, task);
925        Ok(())
926    }
927
928    /// Update task status and message for internal execution bridges.
929    pub fn update_task_status_with_reason(
930        &self,
931        task_id: &str,
932        status: TaskStatus,
933        reason: impl Into<String>,
934    ) -> Result<(), String> {
935        let reason = reason.into();
936        let mut tasks = self.tasks.write().unwrap();
937        let task = tasks
938            .get_mut(task_id)
939            .ok_or_else(|| format!("Task not found: {task_id}"))?;
940        task.status = status;
941        task.status_reason = Some(reason.clone());
942        task.updated_at = Utc::now();
943        task.audit_log.push(TaskAuditEvent {
944            at: task.updated_at,
945            event: "status_updated".to_string(),
946            detail: Some(reason),
947        });
948        let kind = match task.status {
949            TaskStatus::Completed => A2ATaskEventKind::Completed,
950            TaskStatus::Failed => A2ATaskEventKind::Failed,
951            TaskStatus::Cancelled => A2ATaskEventKind::Cancelled,
952            _ => A2ATaskEventKind::Updated,
953        };
954        self.emit_task_event(kind, task);
955        Ok(())
956    }
957
958    /// Report in-flight task progress for internal execution bridges.
959    pub fn update_task_progress(
960        &self,
961        task_id: &str,
962        progress: super::RemoteTaskProgress,
963    ) -> Result<(), String> {
964        let mut tasks = self.tasks.write().unwrap();
965        let task = tasks
966            .get_mut(task_id)
967            .ok_or_else(|| format!("Task not found: {task_id}"))?;
968        task.progress = Some(progress);
969        task.updated_at = Utc::now();
970        task.audit_log.push(TaskAuditEvent {
971            at: task.updated_at,
972            event: "progress_updated".to_string(),
973            detail: Some("Remote execution progress updated".to_string()),
974        });
975        self.emit_task_event(A2ATaskEventKind::Updated, task);
976        Ok(())
977    }
978
979    /// Append a message to a task for final result delivery.
980    pub fn add_message(&self, task_id: &str, message: A2AMessage) -> Result<(), String> {
981        let mut tasks = self.tasks.write().unwrap();
982        let task = tasks
983            .get_mut(task_id)
984            .ok_or_else(|| format!("Task not found: {task_id}"))?;
985        task.messages.push(message);
986        task.updated_at = Utc::now();
987        task.audit_log.push(TaskAuditEvent {
988            at: task.updated_at,
989            event: "message_added".to_string(),
990            detail: Some("Message appended to remote task transcript".to_string()),
991        });
992        self.emit_task_event(A2ATaskEventKind::Updated, task);
993        Ok(())
994    }
995
996    /// Add an artifact to a task (for internal use).
997    pub fn add_artifact(&self, task_id: &str, artifact: Artifact) -> Result<(), String> {
998        let mut tasks = self.tasks.write().unwrap();
999        let task = tasks
1000            .get_mut(task_id)
1001            .ok_or_else(|| format!("Task not found: {task_id}"))?;
1002        task.artifacts.push(artifact);
1003        task.updated_at = Utc::now();
1004        task.audit_log.push(TaskAuditEvent {
1005            at: task.updated_at,
1006            event: "artifact_added".to_string(),
1007            detail: Some("Artifact attached to remote task".to_string()),
1008        });
1009        self.emit_task_event(A2ATaskEventKind::ArtifactAdded, task);
1010        Ok(())
1011    }
1012}
1013
1014/// Create a default agent card for Gestura.
1015pub fn create_gestura_agent_card(base_url: &str) -> AgentCard {
1016    use super::{AuthenticationInfo, Skill};
1017
1018    AgentCard {
1019        name: "gestura".to_string(),
1020        description: "Voice-powered agentic workflows with MCP integration".to_string(),
1021        url: format!("{}/a2a", base_url),
1022        protocol_version: "0.3.0".to_string(),
1023        skills: vec![
1024            Skill {
1025                id: "voice-command".to_string(),
1026                name: "Voice Command".to_string(),
1027                description: "Process voice commands and execute workflows".to_string(),
1028                input_modes: vec!["text".to_string(), "audio".to_string()],
1029                output_modes: vec!["text".to_string()],
1030                examples: vec!["Run the build".to_string(), "Open the settings".to_string()],
1031            },
1032            Skill {
1033                id: "tool-execution".to_string(),
1034                name: "Tool Execution".to_string(),
1035                description: "Execute MCP tools and return results".to_string(),
1036                input_modes: vec!["text".to_string()],
1037                output_modes: vec!["text".to_string(), "data".to_string()],
1038                examples: vec![
1039                    "Send haptic feedback".to_string(),
1040                    "Check ring battery".to_string(),
1041                ],
1042            },
1043        ],
1044        authentication: Some(AuthenticationInfo {
1045            schemes: vec!["bearer".to_string()],
1046            oauth2: None,
1047        }),
1048        default_input_modes: vec!["text".to_string()],
1049        default_output_modes: vec!["text".to_string()],
1050        supported_rpc_methods: vec![
1051            "agent/discover".to_string(),
1052            "task/create".to_string(),
1053            "task/status".to_string(),
1054            "task/cancel".to_string(),
1055            "task/retry".to_string(),
1056            "task/heartbeat".to_string(),
1057            "task/artifacts".to_string(),
1058            "task/artifact".to_string(),
1059            "profile/register".to_string(),
1060            "profile/validate".to_string(),
1061        ],
1062        supported_task_features: vec![
1063            "idempotency".to_string(),
1064            "leases".to_string(),
1065            "progress".to_string(),
1066            "artifacts".to_string(),
1067            "sse-events".to_string(),
1068            "provenance".to_string(),
1069            "authenticated-mutations".to_string(),
1070        ],
1071    }
1072}
1073
1074#[cfg(test)]
1075mod tests {
1076    use super::*;
1077    use crate::{A2ARequest, AuthenticationInfo, MessagePart};
1078    use std::collections::HashMap;
1079
1080    fn test_card(authentication: Option<AuthenticationInfo>) -> AgentCard {
1081        AgentCard {
1082            name: "test-agent".to_string(),
1083            description: "Test agent".to_string(),
1084            url: "http://localhost:8080/a2a".to_string(),
1085            protocol_version: "0.3.0".to_string(),
1086            skills: vec![],
1087            authentication,
1088            default_input_modes: vec!["text".to_string()],
1089            default_output_modes: vec!["text".to_string()],
1090            supported_rpc_methods: vec![
1091                "agent/discover".to_string(),
1092                "task/create".to_string(),
1093                "task/status".to_string(),
1094                "task/cancel".to_string(),
1095                "task/retry".to_string(),
1096                "task/heartbeat".to_string(),
1097                "task/artifacts".to_string(),
1098                "task/artifact".to_string(),
1099            ],
1100            supported_task_features: vec![
1101                "idempotency".to_string(),
1102                "leases".to_string(),
1103                "progress".to_string(),
1104                "artifacts".to_string(),
1105                "sse-events".to_string(),
1106                "provenance".to_string(),
1107                "authenticated-mutations".to_string(),
1108            ],
1109        }
1110    }
1111
1112    #[test]
1113    fn test_agent_card_serialization() {
1114        let card = create_gestura_agent_card("http://localhost:8080");
1115        let json = serde_json::to_string(&card).unwrap();
1116        assert!(json.contains("gestura"));
1117        assert!(json.contains("voice-command"));
1118    }
1119
1120    #[test]
1121    fn test_a2a_server_discover() {
1122        let card = create_gestura_agent_card("http://localhost:8080");
1123        let server = A2AServer::new(card);
1124
1125        let request = A2ARequest {
1126            jsonrpc: "2.0".to_string(),
1127            method: "agent/discover".to_string(),
1128            params: serde_json::Value::Null,
1129            id: serde_json::json!(1),
1130        };
1131
1132        let response = server.handle_request(request);
1133        assert!(response.result.is_some());
1134        assert!(response.error.is_none());
1135    }
1136
1137    #[test]
1138    fn test_a2a_server_task_lifecycle() {
1139        let server = A2AServer::new(test_card(None));
1140
1141        let create_request = A2ARequest {
1142            jsonrpc: "2.0".to_string(),
1143            method: "task/create".to_string(),
1144            params: serde_json::json!({
1145                "role": "user",
1146                "parts": [{"type": "text", "text": "Hello agent"}]
1147            }),
1148            id: serde_json::json!(1),
1149        };
1150
1151        let create_response = server.handle_request(create_request);
1152        assert!(create_response.result.is_some());
1153
1154        let task: A2ATask = serde_json::from_value(create_response.result.unwrap()).unwrap();
1155        assert_eq!(task.status, TaskStatus::Pending);
1156
1157        let status_request = A2ARequest {
1158            jsonrpc: "2.0".to_string(),
1159            method: "task/status".to_string(),
1160            params: serde_json::json!({"taskId": task.id}),
1161            id: serde_json::json!(2),
1162        };
1163
1164        let status_response = server.handle_request(status_request);
1165        assert!(status_response.result.is_some());
1166
1167        let cancel_request = A2ARequest {
1168            jsonrpc: "2.0".to_string(),
1169            method: "task/cancel".to_string(),
1170            params: serde_json::json!({"taskId": task.id}),
1171            id: serde_json::json!(3),
1172        };
1173
1174        let cancel_response = server.handle_request(cancel_request);
1175        assert!(cancel_response.result.is_some());
1176
1177        let cancelled_task: A2ATask =
1178            serde_json::from_value(cancel_response.result.unwrap()).unwrap();
1179        assert_eq!(cancelled_task.status, TaskStatus::Cancelled);
1180    }
1181
1182    #[test]
1183    fn test_task_create_reuses_idempotency_key_for_same_caller_scope() {
1184        let server = A2AServer::new(test_card(None));
1185
1186        let first = server.handle_request(A2ARequest {
1187            jsonrpc: "2.0".to_string(),
1188            method: "task/create".to_string(),
1189            params: serde_json::json!({
1190                "role": "user",
1191                "parts": [{"type": "text", "text": "Do the remote task"}],
1192                "idempotencyKey": "idem-123"
1193            }),
1194            id: serde_json::json!(1),
1195        });
1196        let second = server.handle_request(A2ARequest {
1197            jsonrpc: "2.0".to_string(),
1198            method: "task/create".to_string(),
1199            params: serde_json::json!({
1200                "role": "user",
1201                "parts": [{"type": "text", "text": "Do the remote task again"}],
1202                "idempotencyKey": "idem-123"
1203            }),
1204            id: serde_json::json!(2),
1205        });
1206
1207        let first_task: A2ATask = serde_json::from_value(first.result.unwrap()).unwrap();
1208        let second_task: A2ATask = serde_json::from_value(second.result.unwrap()).unwrap();
1209        assert_eq!(first_task.id, second_task.id);
1210        assert!(
1211            second_task
1212                .audit_log
1213                .iter()
1214                .any(|event| event.event == "idempotent_replay")
1215        );
1216    }
1217
1218    #[test]
1219    fn test_task_heartbeat_updates_progress_and_lease() {
1220        let server = A2AServer::new(test_card(None));
1221
1222        let create_response = server.handle_request(A2ARequest {
1223            jsonrpc: "2.0".to_string(),
1224            method: "task/create".to_string(),
1225            params: serde_json::json!({
1226                "role": "user",
1227                "parts": [{"type": "text", "text": "Track leased progress"}],
1228                "leaseRequest": {"ttlSecs": 30, "heartbeatIntervalSecs": 5}
1229            }),
1230            id: serde_json::json!(1),
1231        });
1232        let task: A2ATask = serde_json::from_value(create_response.result.unwrap()).unwrap();
1233        let original_expiry = task.lease.as_ref().unwrap().expires_at;
1234
1235        let heartbeat_response = server.handle_request(A2ARequest {
1236            jsonrpc: "2.0".to_string(),
1237            method: "task/heartbeat".to_string(),
1238            params: serde_json::json!({
1239                "taskId": task.id,
1240                "status": "running",
1241                "statusReason": "Remote worker is executing",
1242                "extendLeaseSecs": 60,
1243                "progress": {
1244                    "stage": "compile",
1245                    "message": "Running compile phase",
1246                    "percent": 40,
1247                    "updatedAt": Utc::now()
1248                }
1249            }),
1250            id: serde_json::json!(2),
1251        });
1252        let updated_task: A2ATask =
1253            serde_json::from_value(heartbeat_response.result.unwrap()).unwrap();
1254
1255        assert_eq!(updated_task.status, TaskStatus::Running);
1256        assert_eq!(
1257            updated_task
1258                .progress
1259                .as_ref()
1260                .and_then(|progress| progress.percent),
1261            Some(40)
1262        );
1263        assert!(updated_task.lease.as_ref().unwrap().expires_at > original_expiry);
1264    }
1265
1266    #[test]
1267    fn test_task_status_blocks_when_lease_expires() {
1268        let server = A2AServer::new(test_card(None));
1269
1270        let create_response = server.handle_request(A2ARequest {
1271            jsonrpc: "2.0".to_string(),
1272            method: "task/create".to_string(),
1273            params: serde_json::json!({
1274                "role": "user",
1275                "parts": [{"type": "text", "text": "Expire quickly"}],
1276                "leaseRequest": {"ttlSecs": 0, "heartbeatIntervalSecs": 1}
1277            }),
1278            id: serde_json::json!(1),
1279        });
1280        let task: A2ATask = serde_json::from_value(create_response.result.unwrap()).unwrap();
1281
1282        let status_response = server.handle_request(A2ARequest {
1283            jsonrpc: "2.0".to_string(),
1284            method: "task/status".to_string(),
1285            params: serde_json::json!({"taskId": task.id}),
1286            id: serde_json::json!(2),
1287        });
1288        let blocked_task: A2ATask =
1289            serde_json::from_value(status_response.result.unwrap()).unwrap();
1290
1291        assert_eq!(blocked_task.status, TaskStatus::Blocked);
1292        assert_eq!(
1293            blocked_task.status_reason.as_deref(),
1294            Some("Remote lease heartbeat expired")
1295        );
1296    }
1297
1298    #[test]
1299    fn test_authenticated_mutations_reject_non_owner_and_artifact_fetch_succeeds_for_owner() {
1300        let server = A2AServer::new(test_card(Some(AuthenticationInfo {
1301            schemes: vec!["bearer".to_string()],
1302            oauth2: None,
1303        })));
1304        let mut owner = AgentProfile::new("owner-agent", "Owner");
1305        owner.generate_token(1);
1306        let owner_token = owner.auth_token.clone().unwrap();
1307        server.profile_store.store(owner.clone());
1308
1309        let mut intruder = AgentProfile::new("intruder-agent", "Intruder");
1310        intruder.generate_token(1);
1311        let intruder_token = intruder.auth_token.clone().unwrap();
1312        server.profile_store.store(intruder.clone());
1313
1314        let create_response = server.handle_request_with_auth(
1315            A2ARequest {
1316                jsonrpc: "2.0".to_string(),
1317                method: "task/create".to_string(),
1318                params: serde_json::json!({
1319                    "role": "user",
1320                    "parts": [{"type": "text", "text": "Protected task"}],
1321                    "leaseRequest": {"ttlSecs": 30, "heartbeatIntervalSecs": 5}
1322                }),
1323                id: serde_json::json!(1),
1324            },
1325            Some(&owner_token),
1326        );
1327        let task: A2ATask = serde_json::from_value(create_response.result.unwrap()).unwrap();
1328        assert_eq!(
1329            task.provenance
1330                .as_ref()
1331                .and_then(|value| value.caller_agent_id.as_deref()),
1332            Some("owner-agent")
1333        );
1334        assert_eq!(
1335            task.provenance.as_ref().map(|value| value.authenticated),
1336            Some(true)
1337        );
1338        assert_eq!(
1339            task.provenance
1340                .as_ref()
1341                .and_then(|value| value.auth_scheme.as_deref()),
1342            Some("bearer")
1343        );
1344        server
1345            .add_artifact(
1346                &task.id,
1347                Artifact {
1348                    name: "result.txt".to_string(),
1349                    parts: vec![MessagePart::Text {
1350                        text: "artifact body".to_string(),
1351                    }],
1352                    metadata: HashMap::new(),
1353                },
1354            )
1355            .unwrap();
1356
1357        let unauthorized = server.handle_request_with_auth(
1358            A2ARequest {
1359                jsonrpc: "2.0".to_string(),
1360                method: "task/cancel".to_string(),
1361                params: serde_json::json!({"taskId": task.id}),
1362                id: serde_json::json!(2),
1363            },
1364            Some(&intruder_token),
1365        );
1366        assert!(unauthorized.error.is_some());
1367
1368        let unauthorized_status = server.handle_request_with_auth(
1369            A2ARequest {
1370                jsonrpc: "2.0".to_string(),
1371                method: "task/status".to_string(),
1372                params: serde_json::json!({"taskId": task.id}),
1373                id: serde_json::json!(3),
1374            },
1375            Some(&intruder_token),
1376        );
1377        assert!(unauthorized_status.error.is_some());
1378
1379        let unauthorized_manifest = server.handle_request_with_auth(
1380            A2ARequest {
1381                jsonrpc: "2.0".to_string(),
1382                method: "task/artifacts".to_string(),
1383                params: serde_json::json!({"taskId": task.id}),
1384                id: serde_json::json!(4),
1385            },
1386            Some(&intruder_token),
1387        );
1388        assert!(unauthorized_manifest.error.is_some());
1389
1390        let artifact_response = server.handle_request_with_auth(
1391            A2ARequest {
1392                jsonrpc: "2.0".to_string(),
1393                method: "task/artifact".to_string(),
1394                params: serde_json::json!({"taskId": task.id, "artifactName": "result.txt"}),
1395                id: serde_json::json!(5),
1396            },
1397            Some(&owner_token),
1398        );
1399        let artifact: Artifact = serde_json::from_value(artifact_response.result.unwrap()).unwrap();
1400        assert_eq!(artifact.name, "result.txt");
1401
1402        let status_response = server.handle_request_with_auth(
1403            A2ARequest {
1404                jsonrpc: "2.0".to_string(),
1405                method: "task/status".to_string(),
1406                params: serde_json::json!({"taskId": task.id}),
1407                id: serde_json::json!(6),
1408            },
1409            Some(&owner_token),
1410        );
1411        assert!(status_response.error.is_none());
1412
1413        let manifest_response = server.handle_request_with_auth(
1414            A2ARequest {
1415                jsonrpc: "2.0".to_string(),
1416                method: "task/artifacts".to_string(),
1417                params: serde_json::json!({"taskId": task.id}),
1418                id: serde_json::json!(7),
1419            },
1420            Some(&owner_token),
1421        );
1422        let manifest: Vec<ArtifactManifestEntry> =
1423            serde_json::from_value(manifest_response.result.unwrap()).unwrap();
1424        assert_eq!(manifest.len(), 1);
1425        assert_eq!(manifest[0].name, "result.txt");
1426    }
1427
1428    #[test]
1429    fn test_task_events_and_artifact_manifest_are_emitted() {
1430        let server = A2AServer::new(test_card(None));
1431        let events = server.subscribe_events();
1432        let create_response = server.handle_request(A2ARequest {
1433            jsonrpc: "2.0".to_string(),
1434            method: "task/create".to_string(),
1435            params: serde_json::json!({
1436                "role": "user",
1437                "parts": [{"type": "text", "text": "Stream updates"}]
1438            }),
1439            id: serde_json::json!(1),
1440        });
1441        let task: A2ATask = serde_json::from_value(create_response.result.unwrap()).unwrap();
1442        let created = events.try_recv().unwrap();
1443        assert!(matches!(created.kind, A2ATaskEventKind::Created));
1444
1445        server
1446            .add_artifact(
1447                &task.id,
1448                Artifact {
1449                    name: "summary.md".to_string(),
1450                    parts: vec![MessagePart::Text {
1451                        text: "# done".to_string(),
1452                    }],
1453                    metadata: HashMap::from([(
1454                        "mimeType".to_string(),
1455                        serde_json::json!("text/markdown"),
1456                    )]),
1457                },
1458            )
1459            .unwrap();
1460        let artifact_event = events.try_recv().unwrap();
1461        assert!(matches!(
1462            artifact_event.kind,
1463            A2ATaskEventKind::ArtifactAdded
1464        ));
1465
1466        let manifest_response = server.handle_request(A2ARequest {
1467            jsonrpc: "2.0".to_string(),
1468            method: "task/artifacts".to_string(),
1469            params: serde_json::json!({"taskId": task.id}),
1470            id: serde_json::json!(2),
1471        });
1472        let manifest: Vec<ArtifactManifestEntry> =
1473            serde_json::from_value(manifest_response.result.unwrap()).unwrap();
1474        assert_eq!(manifest.len(), 1);
1475        assert_eq!(manifest[0].name, "summary.md");
1476    }
1477
1478    #[test]
1479    fn test_agent_card_registry() {
1480        let registry = AgentCardRegistry::new();
1481        let card = create_gestura_agent_card("http://localhost:8080");
1482
1483        registry.register(card.clone());
1484        assert_eq!(registry.list().len(), 1);
1485
1486        let retrieved = registry.get("gestura").unwrap();
1487        assert_eq!(retrieved.name, "gestura");
1488
1489        registry.remove("gestura");
1490        assert!(registry.get("gestura").is_none());
1491    }
1492
1493    #[test]
1494    fn test_message_part_serialization() {
1495        let text_part = MessagePart::Text {
1496            text: "Hello".to_string(),
1497        };
1498        let json = serde_json::to_string(&text_part).unwrap();
1499        assert!(json.contains("text"));
1500        assert!(json.contains("Hello"));
1501
1502        let file_part = MessagePart::File {
1503            uri: "file:///test.txt".to_string(),
1504            mime_type: Some("text/plain".to_string()),
1505        };
1506        let json = serde_json::to_string(&file_part).unwrap();
1507        assert!(json.contains("file"));
1508        assert!(json.contains("file:///test.txt"));
1509    }
1510}