1use super::integrator::McpIntegrator;
8use super::notifications::NotificationSender;
9use super::types::{
10 InitializeParams, PromptsGetParams, ResourcesReadParams, error_codes, mcp_error_codes,
11};
12use super::{
13 McpLogger, McpNotification, ProgressTracker, PromptRegistry, Resource, ResourcesListResult,
14 ResourcesReadResult, SessionManager, Tool, ToolsCallResult, ToolsListResult,
15 create_notification_channel, create_session_manager, get_mcp,
16};
17use crate::error::{AppError, Result};
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::sync::Arc;
22
23#[derive(Debug, Clone, Deserialize)]
27pub struct JsonRpcRequest {
28 pub jsonrpc: String,
30 pub method: String,
32 pub params: Option<serde_json::Value>,
34 pub id: Option<serde_json::Value>,
36}
37
38#[derive(Debug, Clone, Serialize)]
40pub struct JsonRpcResponse {
41 pub jsonrpc: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub result: Option<serde_json::Value>,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub error: Option<JsonRpcError>,
49 pub id: Option<serde_json::Value>,
51}
52
53#[derive(Debug, Clone, Serialize)]
55pub struct JsonRpcError {
56 pub code: i32,
58 pub message: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub data: Option<serde_json::Value>,
63}
64
65#[derive(Debug, Clone)]
67pub struct McpRequestContext {
68 pub progress: Arc<ProgressTracker>,
70 pub logger: McpLogger,
72}
73
74#[async_trait]
76pub trait McpToolHandler: Send + Sync {
77 async fn call(
83 &self,
84 arguments: Option<serde_json::Value>,
85 auth_token: Option<&str>,
86 ctx: McpRequestContext,
87 ) -> Result<ToolsCallResult>;
88}
89
90#[async_trait]
92pub trait McpResourceHandler: Send + Sync {
93 async fn read(&self, uri: &str, ctx: McpRequestContext) -> Result<ResourcesReadResult>;
95}
96
97struct ToolEntry {
98 tool: Tool,
99 requires_auth: bool,
100 handler: Arc<dyn McpToolHandler>,
101}
102
103impl std::fmt::Debug for ToolEntry {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("ToolEntry")
106 .field("tool", &self.tool)
107 .field("requires_auth", &self.requires_auth)
108 .finish_non_exhaustive()
109 }
110}
111
112struct ResourceEntry {
113 resource: Resource,
114 handler: Arc<dyn McpResourceHandler>,
115}
116
117impl std::fmt::Debug for ResourceEntry {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("ResourceEntry")
120 .field("resource", &self.resource)
121 .finish_non_exhaustive()
122 }
123}
124
125#[derive(Debug)]
130pub struct McpServer {
131 tools: HashMap<String, ToolEntry>,
132 resources: HashMap<String, ResourceEntry>,
133
134 session: Arc<SessionManager>,
135 prompts: PromptRegistry,
136
137 progress: Arc<ProgressTracker>,
138 logger: McpLogger,
139 notification_sender: NotificationSender,
140}
141
142impl McpServer {
143 pub fn new() -> Self {
146 let (notification_sender, _) = create_notification_channel();
147 let session = create_session_manager();
148 let prompts = PromptRegistry::new();
149
150 let progress = Arc::new(ProgressTracker::new(notification_sender.clone()));
151 let logger = McpLogger::new(notification_sender.clone(), Some("gestura".to_string()));
152
153 Self {
154 tools: HashMap::new(),
155 resources: HashMap::new(),
156 session,
157 prompts,
158 progress,
159 logger,
160 notification_sender,
161 }
162 }
163
164 pub fn session(&self) -> &Arc<SessionManager> {
166 &self.session
167 }
168
169 pub fn progress_tracker(&self) -> &Arc<ProgressTracker> {
171 &self.progress
172 }
173
174 pub fn subscribe_notifications(&self) -> tokio::sync::broadcast::Receiver<McpNotification> {
176 self.notification_sender.subscribe()
177 }
178
179 pub fn register_tool(
181 &mut self,
182 tool: Tool,
183 requires_auth: bool,
184 handler: Arc<dyn McpToolHandler>,
185 ) {
186 self.tools.insert(
187 tool.name.clone(),
188 ToolEntry {
189 tool,
190 requires_auth,
191 handler,
192 },
193 );
194
195 let _ = self
197 .notification_sender
198 .send(McpNotification::ToolsListChanged);
199 }
200
201 pub fn register_resource(&mut self, resource: Resource, handler: Arc<dyn McpResourceHandler>) {
203 self.resources
204 .insert(resource.uri.clone(), ResourceEntry { resource, handler });
205 let _ = self
206 .notification_sender
207 .send(McpNotification::ResourcesListChanged);
208 }
209
210 pub async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
212 match request.method.as_str() {
213 "initialize" => self.handle_initialize(request.params, request.id),
215 "notifications/initialized" => self.handle_initialized(request.id),
216 "ping" => self.handle_ping(request.id),
217 "shutdown" => self.handle_shutdown(request.id),
218
219 "tools/list" => self.handle_list_tools(request.id),
221 "tools/call" => self.handle_call_tool(request.params, request.id).await,
222
223 "resources/list" => self.handle_list_resources(request.id),
225 "resources/read" => self.handle_read_resource(request.params, request.id).await,
226
227 "prompts/list" => self.handle_list_prompts(request.id),
229 "prompts/get" => self.handle_get_prompt(request.params, request.id),
230
231 "notifications/cancelled" => self.handle_cancelled(request.params, request.id),
233
234 _ => JsonRpcResponse {
235 jsonrpc: "2.0".to_string(),
236 result: None,
237 error: Some(JsonRpcError {
238 code: error_codes::METHOD_NOT_FOUND,
239 message: "Method not found".to_string(),
240 data: None,
241 }),
242 id: request.id,
243 },
244 }
245 }
246
247 fn ctx(&self) -> McpRequestContext {
248 McpRequestContext {
249 progress: self.progress.clone(),
250 logger: self.logger.clone(),
251 }
252 }
253
254 fn handle_initialize(
255 &self,
256 params: Option<serde_json::Value>,
257 id: Option<serde_json::Value>,
258 ) -> JsonRpcResponse {
259 let init_params: InitializeParams = match params {
260 Some(p) => match serde_json::from_value(p) {
261 Ok(v) => v,
262 Err(e) => {
263 return JsonRpcResponse {
264 jsonrpc: "2.0".to_string(),
265 result: None,
266 error: Some(JsonRpcError {
267 code: error_codes::INVALID_PARAMS,
268 message: format!("Invalid params: {e}"),
269 data: None,
270 }),
271 id,
272 };
273 }
274 },
275 None => {
276 return JsonRpcResponse {
277 jsonrpc: "2.0".to_string(),
278 result: None,
279 error: Some(JsonRpcError {
280 code: error_codes::INVALID_PARAMS,
281 message: "Missing initialize params".to_string(),
282 data: None,
283 }),
284 id,
285 };
286 }
287 };
288
289 match self.session.initialize(init_params) {
290 Ok(result) => JsonRpcResponse {
291 jsonrpc: "2.0".to_string(),
292 result: Some(serde_json::to_value(result).unwrap_or_default()),
293 error: None,
294 id,
295 },
296 Err(message) => JsonRpcResponse {
297 jsonrpc: "2.0".to_string(),
298 result: None,
299 error: Some(JsonRpcError {
300 code: mcp_error_codes::ALREADY_INITIALIZED,
301 message,
302 data: None,
303 }),
304 id,
305 },
306 }
307 }
308
309 fn handle_initialized(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
310 match self.session.initialized() {
311 Ok(()) => JsonRpcResponse {
312 jsonrpc: "2.0".to_string(),
313 result: Some(serde_json::json!({})),
314 error: None,
315 id,
316 },
317 Err(message) => JsonRpcResponse {
318 jsonrpc: "2.0".to_string(),
319 result: None,
320 error: Some(JsonRpcError {
321 code: mcp_error_codes::NOT_INITIALIZED,
322 message,
323 data: None,
324 }),
325 id,
326 },
327 }
328 }
329
330 fn handle_ping(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
331 let result = self.session.ping();
332 JsonRpcResponse {
333 jsonrpc: "2.0".to_string(),
334 result: Some(serde_json::to_value(result).unwrap_or_default()),
335 error: None,
336 id,
337 }
338 }
339
340 fn handle_shutdown(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
341 match self.session.shutdown() {
342 Ok(()) => JsonRpcResponse {
343 jsonrpc: "2.0".to_string(),
344 result: Some(serde_json::json!({})),
345 error: None,
346 id,
347 },
348 Err(message) => JsonRpcResponse {
349 jsonrpc: "2.0".to_string(),
350 result: None,
351 error: Some(JsonRpcError {
352 code: error_codes::INTERNAL_ERROR,
353 message,
354 data: None,
355 }),
356 id,
357 },
358 }
359 }
360
361 fn handle_list_tools(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
362 let tools = self.tools.values().map(|t| t.tool.clone()).collect();
363 let result = ToolsListResult {
364 tools,
365 next_cursor: None,
366 };
367 JsonRpcResponse {
368 jsonrpc: "2.0".to_string(),
369 result: Some(serde_json::to_value(result).unwrap_or_default()),
370 error: None,
371 id,
372 }
373 }
374
375 async fn handle_call_tool(
376 &self,
377 params: Option<serde_json::Value>,
378 id: Option<serde_json::Value>,
379 ) -> JsonRpcResponse {
380 #[derive(Debug, Deserialize)]
381 struct ToolsCallWithAuth {
382 name: String,
383 #[serde(default)]
384 arguments: Option<serde_json::Value>,
385 #[serde(default, alias = "authToken")]
386 auth_token: Option<String>,
387 }
388
389 let parsed: ToolsCallWithAuth = match params {
390 Some(p) => match serde_json::from_value(p) {
391 Ok(v) => v,
392 Err(e) => {
393 return JsonRpcResponse {
394 jsonrpc: "2.0".to_string(),
395 result: None,
396 error: Some(JsonRpcError {
397 code: error_codes::INVALID_PARAMS,
398 message: format!("Invalid params: {e}"),
399 data: None,
400 }),
401 id,
402 };
403 }
404 },
405 None => {
406 return JsonRpcResponse {
407 jsonrpc: "2.0".to_string(),
408 result: None,
409 error: Some(JsonRpcError {
410 code: error_codes::INVALID_PARAMS,
411 message: "Missing params".to_string(),
412 data: None,
413 }),
414 id,
415 };
416 }
417 };
418
419 let Some(entry) = self.tools.get(&parsed.name) else {
420 return JsonRpcResponse {
421 jsonrpc: "2.0".to_string(),
422 result: None,
423 error: Some(JsonRpcError {
424 code: mcp_error_codes::TOOL_NOT_FOUND,
425 message: format!("Tool not found: {}", parsed.name),
426 data: None,
427 }),
428 id,
429 };
430 };
431
432 if entry.requires_auth {
434 let Some(token) = parsed.auth_token.as_deref() else {
435 return JsonRpcResponse {
436 jsonrpc: "2.0".to_string(),
437 result: None,
438 error: Some(JsonRpcError {
439 code: error_codes::INVALID_PARAMS,
440 message: "Authentication required: missing auth_token in request params"
441 .to_string(),
442 data: None,
443 }),
444 id,
445 };
446 };
447 match get_mcp().validate_token(token).await {
448 Ok(true) => {}
449 Ok(false) => {
450 return JsonRpcResponse {
451 jsonrpc: "2.0".to_string(),
452 result: None,
453 error: Some(JsonRpcError {
454 code: error_codes::INVALID_PARAMS,
455 message: "Invalid or expired authentication token".to_string(),
456 data: None,
457 }),
458 id,
459 };
460 }
461 Err(e) => {
462 return JsonRpcResponse {
463 jsonrpc: "2.0".to_string(),
464 result: None,
465 error: Some(Self::to_jsonrpc_error(e)),
466 id,
467 };
468 }
469 }
470 }
471
472 match entry
473 .handler
474 .call(parsed.arguments, parsed.auth_token.as_deref(), self.ctx())
475 .await
476 {
477 Ok(result) => JsonRpcResponse {
478 jsonrpc: "2.0".to_string(),
479 result: Some(serde_json::to_value(result).unwrap_or_default()),
480 error: None,
481 id,
482 },
483 Err(e) => JsonRpcResponse {
484 jsonrpc: "2.0".to_string(),
485 result: None,
486 error: Some(Self::to_jsonrpc_error(e)),
487 id,
488 },
489 }
490 }
491
492 fn handle_list_resources(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
493 let resources = self
494 .resources
495 .values()
496 .map(|r| r.resource.clone())
497 .collect();
498 let result = ResourcesListResult {
499 resources,
500 next_cursor: None,
501 };
502 JsonRpcResponse {
503 jsonrpc: "2.0".to_string(),
504 result: Some(serde_json::to_value(result).unwrap_or_default()),
505 error: None,
506 id,
507 }
508 }
509
510 async fn handle_read_resource(
511 &self,
512 params: Option<serde_json::Value>,
513 id: Option<serde_json::Value>,
514 ) -> JsonRpcResponse {
515 let parsed: ResourcesReadParams = match params {
516 Some(p) => match serde_json::from_value(p) {
517 Ok(v) => v,
518 Err(e) => {
519 return JsonRpcResponse {
520 jsonrpc: "2.0".to_string(),
521 result: None,
522 error: Some(JsonRpcError {
523 code: error_codes::INVALID_PARAMS,
524 message: format!("Invalid params: {e}"),
525 data: None,
526 }),
527 id,
528 };
529 }
530 },
531 None => {
532 return JsonRpcResponse {
533 jsonrpc: "2.0".to_string(),
534 result: None,
535 error: Some(JsonRpcError {
536 code: error_codes::INVALID_PARAMS,
537 message: "Missing params".to_string(),
538 data: None,
539 }),
540 id,
541 };
542 }
543 };
544
545 let Some(entry) = self.resources.get(&parsed.uri) else {
546 return JsonRpcResponse {
547 jsonrpc: "2.0".to_string(),
548 result: None,
549 error: Some(JsonRpcError {
550 code: mcp_error_codes::RESOURCE_NOT_FOUND,
551 message: format!("Resource not found: {}", parsed.uri),
552 data: None,
553 }),
554 id,
555 };
556 };
557
558 match entry.handler.read(&parsed.uri, self.ctx()).await {
559 Ok(result) => JsonRpcResponse {
560 jsonrpc: "2.0".to_string(),
561 result: Some(serde_json::to_value(result).unwrap_or_default()),
562 error: None,
563 id,
564 },
565 Err(e) => JsonRpcResponse {
566 jsonrpc: "2.0".to_string(),
567 result: None,
568 error: Some(Self::to_jsonrpc_error(e)),
569 id,
570 },
571 }
572 }
573
574 fn handle_list_prompts(&self, id: Option<serde_json::Value>) -> JsonRpcResponse {
575 let result = super::types::PromptsListResult {
576 prompts: self.prompts.list(),
577 next_cursor: None,
578 };
579 JsonRpcResponse {
580 jsonrpc: "2.0".to_string(),
581 result: Some(serde_json::to_value(result).unwrap_or_default()),
582 error: None,
583 id,
584 }
585 }
586
587 fn handle_get_prompt(
588 &self,
589 params: Option<serde_json::Value>,
590 id: Option<serde_json::Value>,
591 ) -> JsonRpcResponse {
592 let parsed: PromptsGetParams = match params {
593 Some(p) => match serde_json::from_value(p) {
594 Ok(v) => v,
595 Err(e) => {
596 return JsonRpcResponse {
597 jsonrpc: "2.0".to_string(),
598 result: None,
599 error: Some(JsonRpcError {
600 code: error_codes::INVALID_PARAMS,
601 message: format!("Invalid params: {e}"),
602 data: None,
603 }),
604 id,
605 };
606 }
607 },
608 None => {
609 return JsonRpcResponse {
610 jsonrpc: "2.0".to_string(),
611 result: None,
612 error: Some(JsonRpcError {
613 code: error_codes::INVALID_PARAMS,
614 message: "Missing params".to_string(),
615 data: None,
616 }),
617 id,
618 };
619 }
620 };
621
622 match self.prompts.get(&parsed.name, parsed.arguments.as_ref()) {
623 Some(result) => JsonRpcResponse {
624 jsonrpc: "2.0".to_string(),
625 result: Some(serde_json::to_value(result).unwrap_or_default()),
626 error: None,
627 id,
628 },
629 None => JsonRpcResponse {
630 jsonrpc: "2.0".to_string(),
631 result: None,
632 error: Some(JsonRpcError {
633 code: mcp_error_codes::PROMPT_NOT_FOUND,
634 message: format!("Prompt not found: {}", parsed.name),
635 data: None,
636 }),
637 id,
638 },
639 }
640 }
641
642 fn handle_cancelled(
643 &self,
644 params: Option<serde_json::Value>,
645 id: Option<serde_json::Value>,
646 ) -> JsonRpcResponse {
647 #[derive(Debug, Deserialize)]
648 #[serde(rename_all = "camelCase")]
649 struct CancelParams {
650 request_id: String,
651 #[serde(default)]
652 reason: Option<String>,
653 }
654
655 if let Some(p) = params
656 && let Ok(parsed) = serde_json::from_value::<CancelParams>(p)
657 {
658 self.progress
659 .cancel_operation(&parsed.request_id, parsed.reason.clone());
660 self.logger
661 .info(format!("Request {} cancelled", parsed.request_id));
662 }
663
664 JsonRpcResponse {
665 jsonrpc: "2.0".to_string(),
666 result: Some(serde_json::json!({})),
667 error: None,
668 id,
669 }
670 }
671
672 fn to_jsonrpc_error(err: AppError) -> JsonRpcError {
673 let (code, message) = match err {
675 AppError::InvalidInput(msg) => (error_codes::INVALID_PARAMS, msg),
676 AppError::NotFound(msg) => (mcp_error_codes::RESOURCE_NOT_FOUND, msg),
677 AppError::PermissionDenied(msg) => (error_codes::INVALID_PARAMS, msg),
678 other => (error_codes::INTERNAL_ERROR, other.to_string()),
679 };
680 JsonRpcError {
681 code,
682 message,
683 data: None,
684 }
685 }
686}
687
688impl Default for McpServer {
689 fn default() -> Self {
690 Self::new()
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use crate::ToolResultContent;
698
699 struct EchoTool;
700
701 #[async_trait]
702 impl McpToolHandler for EchoTool {
703 async fn call(
704 &self,
705 arguments: Option<serde_json::Value>,
706 _auth_token: Option<&str>,
707 _ctx: McpRequestContext,
708 ) -> Result<ToolsCallResult> {
709 Ok(ToolsCallResult {
710 content: vec![ToolResultContent::Text {
711 text: arguments
712 .unwrap_or_else(|| serde_json::json!({}))
713 .to_string(),
714 }],
715 is_error: None,
716 })
717 }
718 }
719
720 #[tokio::test]
721 async fn tools_list_and_call_round_trip() {
722 let mut server = McpServer::new();
723 server.register_tool(
724 Tool {
725 name: "echo".to_string(),
726 description: Some("Echo input".to_string()),
727 input_schema: serde_json::json!({"type":"object"}),
728 annotations: None,
729 },
730 false,
731 Arc::new(EchoTool),
732 );
733
734 let list = server
735 .handle_request(JsonRpcRequest {
736 jsonrpc: "2.0".to_string(),
737 method: "tools/list".to_string(),
738 params: None,
739 id: Some(serde_json::json!(1)),
740 })
741 .await;
742 assert!(list.error.is_none());
743 assert!(list.result.is_some());
744
745 let call = server
746 .handle_request(JsonRpcRequest {
747 jsonrpc: "2.0".to_string(),
748 method: "tools/call".to_string(),
749 params: Some(serde_json::json!({"name":"echo","arguments":{"a":1}})),
750 id: Some(serde_json::json!(2)),
751 })
752 .await;
753 assert!(call.error.is_none());
754 assert!(call.result.is_some());
755 }
756}