1use crate::error::AppError;
9use opentelemetry::{InstrumentationScope, KeyValue, global, trace::TracerProvider as _};
10use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
11use opentelemetry_sdk::{Resource, trace::SdkTracerProvider};
12use std::collections::HashMap;
13use std::sync::{Arc, OnceLock};
14use std::time::{Duration, Instant};
15use tokio::sync::{Mutex, RwLock};
16use tracing_subscriber::{EnvFilter, fmt::MakeWriter, prelude::*};
17
18pub const DEFAULT_OTLP_HTTP_TRACE_ENDPOINT: &str = "http://127.0.0.1:4318/v1/traces";
22
23pub const DEFAULT_OTLP_TRACE_ENDPOINT: &str = DEFAULT_OTLP_GRPC_TRACE_ENDPOINT;
25
26pub const DEFAULT_OTLP_GRPC_TRACE_ENDPOINT: &str = "http://127.0.0.1:4317";
28
29pub const OTEL_SEMCONV_SCHEMA_URL: &str = "https://opentelemetry.io/schemas/1.40.0";
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TraceExportProtocol {
35 Http,
37 Grpc,
39}
40
41impl TraceExportProtocol {
42 #[must_use]
44 pub const fn as_str(self) -> &'static str {
45 match self {
46 Self::Http => "http",
47 Self::Grpc => "grpc",
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TraceExportConfig {
55 pub enabled: bool,
57 pub protocol: TraceExportProtocol,
59 pub endpoint: String,
61 pub service_name: String,
63 pub service_version: String,
65}
66
67#[derive(Debug, Clone, serde::Serialize)]
69pub struct MetricsSummary {
70 pub timestamp: chrono::DateTime<chrono::Utc>,
71 pub total_metrics: usize,
72 pub counters: HashMap<String, f64>,
73 pub gauges: HashMap<String, f64>,
74 pub histograms: HashMap<String, serde_json::Value>,
75 pub system_health: SystemHealth,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
80pub enum MetricType {
81 Counter,
82 Gauge,
83 Histogram,
84 Timer,
85}
86
87#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
89pub struct Metric {
90 pub name: String,
91 pub metric_type: MetricType,
92 pub value: f64,
93 pub timestamp: chrono::DateTime<chrono::Utc>,
94 pub tags: HashMap<String, String>,
95}
96
97pub struct Timer {
99 name: String,
100 start_time: Instant,
101 tags: HashMap<String, String>,
102}
103
104impl Timer {
105 pub fn new(name: String) -> Self {
106 Self {
107 name,
108 start_time: Instant::now(),
109 tags: HashMap::new(),
110 }
111 }
112
113 pub fn with_tag(mut self, key: String, value: String) -> Self {
114 self.tags.insert(key, value);
115 self
116 }
117
118 pub async fn finish(self) -> Duration {
119 let duration = self.start_time.elapsed();
120
121 let telemetry = get_telemetry_manager().await;
123 telemetry
124 .record_timer(&self.name, duration, self.tags)
125 .await;
126
127 duration
128 }
129}
130
131#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct SystemHealth {
134 pub cpu_usage: f64,
135 pub memory_usage: f64,
136 pub disk_usage: f64,
137 pub network_latency: Option<f64>,
138 pub active_agents: usize,
139 pub active_connections: usize,
140 pub error_rate: f64,
141 pub uptime_seconds: u64,
142}
143
144pub struct TelemetryManager {
146 metrics: Arc<RwLock<Vec<Metric>>>,
147 counters: Arc<RwLock<HashMap<String, f64>>>,
148 gauges: Arc<RwLock<HashMap<String, f64>>>,
149 histograms: Arc<RwLock<HashMap<String, Vec<f64>>>>,
150 system_health: Arc<Mutex<SystemHealth>>,
151 max_metrics: usize,
152 start_time: Instant,
153}
154
155impl TelemetryManager {
156 pub fn new(max_metrics: usize) -> Self {
158 Self {
159 metrics: Arc::new(RwLock::new(Vec::new())),
160 counters: Arc::new(RwLock::new(HashMap::new())),
161 gauges: Arc::new(RwLock::new(HashMap::new())),
162 histograms: Arc::new(RwLock::new(HashMap::new())),
163 system_health: Arc::new(Mutex::new(SystemHealth {
164 cpu_usage: 0.0,
165 memory_usage: 0.0,
166 disk_usage: 0.0,
167 network_latency: None,
168 active_agents: 0,
169 active_connections: 0,
170 error_rate: 0.0,
171 uptime_seconds: 0,
172 })),
173 max_metrics,
174 start_time: Instant::now(),
175 }
176 }
177
178 pub async fn increment_counter(&self, name: &str, value: f64, tags: HashMap<String, String>) {
180 let mut counters = self.counters.write().await;
181 *counters.entry(name.to_string()).or_insert(0.0) += value;
182
183 self.record_metric(Metric {
184 name: name.to_string(),
185 metric_type: MetricType::Counter,
186 value,
187 timestamp: chrono::Utc::now(),
188 tags,
189 })
190 .await;
191 }
192
193 pub async fn set_gauge(&self, name: &str, value: f64, tags: HashMap<String, String>) {
195 let mut gauges = self.gauges.write().await;
196 gauges.insert(name.to_string(), value);
197
198 self.record_metric(Metric {
199 name: name.to_string(),
200 metric_type: MetricType::Gauge,
201 value,
202 timestamp: chrono::Utc::now(),
203 tags,
204 })
205 .await;
206 }
207
208 pub async fn record_histogram(&self, name: &str, value: f64, tags: HashMap<String, String>) {
210 let mut histograms = self.histograms.write().await;
211 histograms
212 .entry(name.to_string())
213 .or_insert_with(Vec::new)
214 .push(value);
215
216 self.record_metric(Metric {
217 name: name.to_string(),
218 metric_type: MetricType::Histogram,
219 value,
220 timestamp: chrono::Utc::now(),
221 tags,
222 })
223 .await;
224 }
225
226 pub async fn record_timer(
228 &self,
229 name: &str,
230 duration: Duration,
231 tags: HashMap<String, String>,
232 ) {
233 let value = duration.as_secs_f64();
234
235 self.record_metric(Metric {
236 name: name.to_string(),
237 metric_type: MetricType::Timer,
238 value,
239 timestamp: chrono::Utc::now(),
240 tags,
241 })
242 .await;
243 }
244
245 async fn record_metric(&self, metric: Metric) {
247 let mut metrics = self.metrics.write().await;
248 metrics.push(metric);
249
250 if metrics.len() > self.max_metrics {
252 metrics.remove(0);
253 }
254 }
255
256 pub async fn update_system_health(&self, health: SystemHealth) {
258 let mut system_health = self.system_health.lock().await;
259 *system_health = health;
260 }
261
262 pub async fn set_active_agents(&self, count: usize) {
264 let mut health = self.system_health.lock().await;
265 health.active_agents = count;
266 }
267
268 pub async fn set_active_connections(&self, count: usize) {
270 let mut health = self.system_health.lock().await;
271 health.active_connections = count;
272 }
273
274 pub async fn get_system_health(&self) -> SystemHealth {
276 let mut health = self.system_health.lock().await;
277 health.uptime_seconds = self.start_time.elapsed().as_secs();
278 health.clone()
279 }
280
281 pub async fn get_recent_metrics(&self, limit: usize) -> Vec<Metric> {
283 let metrics = self.metrics.read().await;
284 metrics.iter().rev().take(limit).cloned().collect()
285 }
286
287 pub async fn get_metrics_summary(&self) -> MetricsSummary {
289 let metrics = self.metrics.read().await;
290 let counters = self.counters.read().await;
291 let gauges = self.gauges.read().await;
292 let histograms = self.histograms.read().await;
293 let health = self.get_system_health().await;
294
295 let mut histogram_stats = HashMap::new();
296 for (name, values) in histograms.iter() {
297 if values.is_empty() {
298 continue;
299 }
300
301 let mut sorted_values = values.clone();
302 sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
303
304 let len = sorted_values.len();
305 let sum: f64 = sorted_values.iter().sum();
306 let mean = sum / len as f64;
307 let median = if len % 2 == 0 {
308 (sorted_values[len / 2 - 1] + sorted_values[len / 2]) / 2.0
309 } else {
310 sorted_values[len / 2]
311 };
312 let p95_idx = ((len as f64) * 0.95).floor() as usize;
313 let p95 = sorted_values
314 .get(p95_idx.min(len.saturating_sub(1)))
315 .copied()
316 .unwrap_or(0.0);
317
318 histogram_stats.insert(
319 name.clone(),
320 serde_json::json!({
321 "count": len,
322 "sum": sum,
323 "mean": mean,
324 "median": median,
325 "p95": p95,
326 "min": sorted_values.first().copied().unwrap_or(0.0),
327 "max": sorted_values.last().copied().unwrap_or(0.0),
328 }),
329 );
330 }
331
332 MetricsSummary {
333 timestamp: chrono::Utc::now(),
334 total_metrics: metrics.len(),
335 counters: counters.clone(),
336 gauges: gauges.clone(),
337 histograms: histogram_stats,
338 system_health: health,
339 }
340 }
341
342 pub async fn clear_metrics(&self) {
344 let mut metrics = self.metrics.write().await;
345 let mut counters = self.counters.write().await;
346 let mut gauges = self.gauges.write().await;
347 let mut histograms = self.histograms.write().await;
348
349 metrics.clear();
350 counters.clear();
351 gauges.clear();
352 histograms.clear();
353
354 tracing::info!("Telemetry metrics cleared");
355 }
356}
357
358static TELEMETRY_MANAGER: tokio::sync::OnceCell<TelemetryManager> =
360 tokio::sync::OnceCell::const_new();
361static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
362
363pub async fn get_telemetry_manager() -> &'static TelemetryManager {
365 TELEMETRY_MANAGER
366 .get_or_init(|| async { TelemetryManager::new(100000) })
367 .await
368}
369
370pub fn start_timer(name: &str) -> Timer {
372 Timer::new(name.to_string())
373}
374
375pub fn init_tracing_subscriber<W>(
381 filter: EnvFilter,
382 writer: W,
383 with_target: bool,
384 trace_export: Option<TraceExportConfig>,
385) -> Result<(), AppError>
386where
387 W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
388{
389 let fmt_layer = tracing_subscriber::fmt::layer()
390 .with_writer(writer)
391 .with_target(with_target);
392
393 if let Some(config) = trace_export.filter(|cfg| cfg.enabled) {
394 let tracer = build_otlp_tracer(&config)?;
395 tracing_subscriber::registry()
396 .with(filter)
397 .with(fmt_layer)
398 .with(tracing_opentelemetry::layer().with_tracer(tracer))
399 .try_init()
400 .map_err(|error| {
401 AppError::Config(format!("failed to initialize tracing subscriber: {error}"))
402 })?;
403 tracing::info!(
404 protocol = config.protocol.as_str(),
405 endpoint = %config.endpoint,
406 service_name = %config.service_name,
407 "OTLP trace export enabled"
408 );
409 } else {
410 tracing_subscriber::registry()
411 .with(filter)
412 .with(fmt_layer)
413 .try_init()
414 .map_err(|error| {
415 AppError::Config(format!("failed to initialize tracing subscriber: {error}"))
416 })?;
417 }
418
419 Ok(())
420}
421
422pub fn shutdown_tracing() {
424 if let Some(tracer_provider) = TRACER_PROVIDER.get()
425 && let Err(error) = tracer_provider.shutdown()
426 {
427 eprintln!("failed to shut down tracer provider: {error}");
428 }
429}
430
431pub async fn increment_counter(name: &str, value: f64) {
433 let telemetry = get_telemetry_manager().await;
434 telemetry
435 .increment_counter(name, value, HashMap::new())
436 .await;
437}
438
439pub async fn set_gauge(name: &str, value: f64) {
441 let telemetry = get_telemetry_manager().await;
442 telemetry.set_gauge(name, value, HashMap::new()).await;
443}
444
445pub async fn record_histogram(name: &str, value: f64) {
447 let telemetry = get_telemetry_manager().await;
448 telemetry
449 .record_histogram(name, value, HashMap::new())
450 .await;
451}
452
453fn build_otlp_tracer(
454 config: &TraceExportConfig,
455) -> Result<opentelemetry_sdk::trace::Tracer, AppError> {
456 let exporter = match config.protocol {
457 TraceExportProtocol::Http => SpanExporter::builder()
458 .with_http()
459 .with_protocol(Protocol::HttpBinary)
460 .with_endpoint(config.endpoint.clone())
461 .build(),
462 TraceExportProtocol::Grpc => SpanExporter::builder()
463 .with_tonic()
464 .with_endpoint(config.endpoint.clone())
465 .build(),
466 }
467 .map_err(|error| AppError::Config(format!("failed to build OTLP trace exporter: {error}")))?;
468
469 let tracer_provider = SdkTracerProvider::builder()
470 .with_batch_exporter(exporter)
471 .with_resource(
472 Resource::builder()
473 .with_service_name(config.service_name.clone())
474 .with_schema_url(
475 [KeyValue::new(
476 "service.version",
477 config.service_version.clone(),
478 )],
479 OTEL_SEMCONV_SCHEMA_URL,
480 )
481 .build(),
482 )
483 .build();
484
485 let tracer = tracer_provider.tracer_with_scope(
486 InstrumentationScope::builder(config.service_name.clone())
487 .with_version(config.service_version.clone())
488 .with_schema_url(OTEL_SEMCONV_SCHEMA_URL)
489 .build(),
490 );
491 let _ = TRACER_PROVIDER.set(tracer_provider.clone());
492 global::set_tracer_provider(tracer_provider);
493
494 Ok(tracer)
495}