gestura_core_foundation/
telemetry.rs

1//! Telemetry and metrics collection for Gestura.app.
2//!
3//! This module provides two complementary observability layers:
4//! - an in-memory metric store used by the GUI/API for local inspection
5//! - optional OTLP trace export for correlated request tracing in tools such as
6//!   SigNoz
7
8use crate::error::AppError;
9use opentelemetry::{InstrumentationScope, KeyValue, global, trace::TracerProvider as _};
10use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig};
11use opentelemetry_sdk::{Resource, trace::SdkTracerProvider};
12use std::collections::HashMap;
13use std::sync::{Arc, OnceLock};
14use std::time::{Duration, Instant};
15use tokio::sync::{Mutex, RwLock};
16use tracing_subscriber::{EnvFilter, fmt::MakeWriter, prelude::*};
17
18/// Default OTLP/HTTP endpoint for local trace collection.
19///
20/// SigNoz exposes this path via the embedded OpenTelemetry collector.
21pub const DEFAULT_OTLP_HTTP_TRACE_ENDPOINT: &str = "http://127.0.0.1:4318/v1/traces";
22
23/// Default OTLP trace endpoint for the current default transport.
24pub const DEFAULT_OTLP_TRACE_ENDPOINT: &str = DEFAULT_OTLP_GRPC_TRACE_ENDPOINT;
25
26/// Default OTLP/gRPC endpoint for local trace collection.
27pub const DEFAULT_OTLP_GRPC_TRACE_ENDPOINT: &str = "http://127.0.0.1:4317";
28
29/// Current OpenTelemetry semantic-conventions schema URL used for traces.
30pub const OTEL_SEMCONV_SCHEMA_URL: &str = "https://opentelemetry.io/schemas/1.40.0";
31
32/// Supported OTLP transport protocols for trace export.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum TraceExportProtocol {
35    /// Export traces via OTLP over HTTP using protobuf binary payloads.
36    Http,
37    /// Export traces via OTLP over gRPC using tonic.
38    Grpc,
39}
40
41impl TraceExportProtocol {
42    /// Returns the stable lowercase config value for this protocol.
43    #[must_use]
44    pub const fn as_str(self) -> &'static str {
45        match self {
46            Self::Http => "http",
47            Self::Grpc => "grpc",
48        }
49    }
50}
51
52/// Runtime configuration for OTLP trace export.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct TraceExportConfig {
55    /// Whether OTLP export should be attached to the tracing subscriber.
56    pub enabled: bool,
57    /// OTLP transport protocol used to reach the collector.
58    pub protocol: TraceExportProtocol,
59    /// Collector endpoint that receives OTLP trace payloads.
60    pub endpoint: String,
61    /// Logical service name shown in the observability backend.
62    pub service_name: String,
63    /// Service version attached as a resource attribute.
64    pub service_version: String,
65}
66
67/// Summary payload for recent in-memory metrics.
68#[derive(Debug, Clone, serde::Serialize)]
69pub struct MetricsSummary {
70    pub timestamp: chrono::DateTime<chrono::Utc>,
71    pub total_metrics: usize,
72    pub counters: HashMap<String, f64>,
73    pub gauges: HashMap<String, f64>,
74    pub histograms: HashMap<String, serde_json::Value>,
75    pub system_health: SystemHealth,
76}
77
78/// Metric types for telemetry
79#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
80pub enum MetricType {
81    Counter,
82    Gauge,
83    Histogram,
84    Timer,
85}
86
87/// Telemetry metric
88#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
89pub struct Metric {
90    pub name: String,
91    pub metric_type: MetricType,
92    pub value: f64,
93    pub timestamp: chrono::DateTime<chrono::Utc>,
94    pub tags: HashMap<String, String>,
95}
96
97/// Performance timer for measuring operation duration
98pub struct Timer {
99    name: String,
100    start_time: Instant,
101    tags: HashMap<String, String>,
102}
103
104impl Timer {
105    pub fn new(name: String) -> Self {
106        Self {
107            name,
108            start_time: Instant::now(),
109            tags: HashMap::new(),
110        }
111    }
112
113    pub fn with_tag(mut self, key: String, value: String) -> Self {
114        self.tags.insert(key, value);
115        self
116    }
117
118    pub async fn finish(self) -> Duration {
119        let duration = self.start_time.elapsed();
120
121        // Record the timing metric
122        let telemetry = get_telemetry_manager().await;
123        telemetry
124            .record_timer(&self.name, duration, self.tags)
125            .await;
126
127        duration
128    }
129}
130
131/// System health metrics
132#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct SystemHealth {
134    pub cpu_usage: f64,
135    pub memory_usage: f64,
136    pub disk_usage: f64,
137    pub network_latency: Option<f64>,
138    pub active_agents: usize,
139    pub active_connections: usize,
140    pub error_rate: f64,
141    pub uptime_seconds: u64,
142}
143
144/// Telemetry manager
145pub struct TelemetryManager {
146    metrics: Arc<RwLock<Vec<Metric>>>,
147    counters: Arc<RwLock<HashMap<String, f64>>>,
148    gauges: Arc<RwLock<HashMap<String, f64>>>,
149    histograms: Arc<RwLock<HashMap<String, Vec<f64>>>>,
150    system_health: Arc<Mutex<SystemHealth>>,
151    max_metrics: usize,
152    start_time: Instant,
153}
154
155impl TelemetryManager {
156    /// Create a new telemetry manager
157    pub fn new(max_metrics: usize) -> Self {
158        Self {
159            metrics: Arc::new(RwLock::new(Vec::new())),
160            counters: Arc::new(RwLock::new(HashMap::new())),
161            gauges: Arc::new(RwLock::new(HashMap::new())),
162            histograms: Arc::new(RwLock::new(HashMap::new())),
163            system_health: Arc::new(Mutex::new(SystemHealth {
164                cpu_usage: 0.0,
165                memory_usage: 0.0,
166                disk_usage: 0.0,
167                network_latency: None,
168                active_agents: 0,
169                active_connections: 0,
170                error_rate: 0.0,
171                uptime_seconds: 0,
172            })),
173            max_metrics,
174            start_time: Instant::now(),
175        }
176    }
177
178    /// Record a counter metric
179    pub async fn increment_counter(&self, name: &str, value: f64, tags: HashMap<String, String>) {
180        let mut counters = self.counters.write().await;
181        *counters.entry(name.to_string()).or_insert(0.0) += value;
182
183        self.record_metric(Metric {
184            name: name.to_string(),
185            metric_type: MetricType::Counter,
186            value,
187            timestamp: chrono::Utc::now(),
188            tags,
189        })
190        .await;
191    }
192
193    /// Record a gauge metric
194    pub async fn set_gauge(&self, name: &str, value: f64, tags: HashMap<String, String>) {
195        let mut gauges = self.gauges.write().await;
196        gauges.insert(name.to_string(), value);
197
198        self.record_metric(Metric {
199            name: name.to_string(),
200            metric_type: MetricType::Gauge,
201            value,
202            timestamp: chrono::Utc::now(),
203            tags,
204        })
205        .await;
206    }
207
208    /// Record a histogram value
209    pub async fn record_histogram(&self, name: &str, value: f64, tags: HashMap<String, String>) {
210        let mut histograms = self.histograms.write().await;
211        histograms
212            .entry(name.to_string())
213            .or_insert_with(Vec::new)
214            .push(value);
215
216        self.record_metric(Metric {
217            name: name.to_string(),
218            metric_type: MetricType::Histogram,
219            value,
220            timestamp: chrono::Utc::now(),
221            tags,
222        })
223        .await;
224    }
225
226    /// Record a timer metric
227    pub async fn record_timer(
228        &self,
229        name: &str,
230        duration: Duration,
231        tags: HashMap<String, String>,
232    ) {
233        let value = duration.as_secs_f64();
234
235        self.record_metric(Metric {
236            name: name.to_string(),
237            metric_type: MetricType::Timer,
238            value,
239            timestamp: chrono::Utc::now(),
240            tags,
241        })
242        .await;
243    }
244
245    /// Record a generic metric
246    async fn record_metric(&self, metric: Metric) {
247        let mut metrics = self.metrics.write().await;
248        metrics.push(metric);
249
250        // Trim metrics if needed
251        if metrics.len() > self.max_metrics {
252            metrics.remove(0);
253        }
254    }
255
256    /// Update system health metrics
257    pub async fn update_system_health(&self, health: SystemHealth) {
258        let mut system_health = self.system_health.lock().await;
259        *system_health = health;
260    }
261
262    /// Update active agent count without overwriting the rest of the health snapshot.
263    pub async fn set_active_agents(&self, count: usize) {
264        let mut health = self.system_health.lock().await;
265        health.active_agents = count;
266    }
267
268    /// Update active connection count without overwriting the rest of the health snapshot.
269    pub async fn set_active_connections(&self, count: usize) {
270        let mut health = self.system_health.lock().await;
271        health.active_connections = count;
272    }
273
274    /// Get current system health
275    pub async fn get_system_health(&self) -> SystemHealth {
276        let mut health = self.system_health.lock().await;
277        health.uptime_seconds = self.start_time.elapsed().as_secs();
278        health.clone()
279    }
280
281    /// Get recent metrics
282    pub async fn get_recent_metrics(&self, limit: usize) -> Vec<Metric> {
283        let metrics = self.metrics.read().await;
284        metrics.iter().rev().take(limit).cloned().collect()
285    }
286
287    /// Get an aggregate snapshot of counters, gauges, histograms, and system health.
288    pub async fn get_metrics_summary(&self) -> MetricsSummary {
289        let metrics = self.metrics.read().await;
290        let counters = self.counters.read().await;
291        let gauges = self.gauges.read().await;
292        let histograms = self.histograms.read().await;
293        let health = self.get_system_health().await;
294
295        let mut histogram_stats = HashMap::new();
296        for (name, values) in histograms.iter() {
297            if values.is_empty() {
298                continue;
299            }
300
301            let mut sorted_values = values.clone();
302            sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
303
304            let len = sorted_values.len();
305            let sum: f64 = sorted_values.iter().sum();
306            let mean = sum / len as f64;
307            let median = if len % 2 == 0 {
308                (sorted_values[len / 2 - 1] + sorted_values[len / 2]) / 2.0
309            } else {
310                sorted_values[len / 2]
311            };
312            let p95_idx = ((len as f64) * 0.95).floor() as usize;
313            let p95 = sorted_values
314                .get(p95_idx.min(len.saturating_sub(1)))
315                .copied()
316                .unwrap_or(0.0);
317
318            histogram_stats.insert(
319                name.clone(),
320                serde_json::json!({
321                    "count": len,
322                    "sum": sum,
323                    "mean": mean,
324                    "median": median,
325                    "p95": p95,
326                    "min": sorted_values.first().copied().unwrap_or(0.0),
327                    "max": sorted_values.last().copied().unwrap_or(0.0),
328                }),
329            );
330        }
331
332        MetricsSummary {
333            timestamp: chrono::Utc::now(),
334            total_metrics: metrics.len(),
335            counters: counters.clone(),
336            gauges: gauges.clone(),
337            histograms: histogram_stats,
338            system_health: health,
339        }
340    }
341
342    /// Clear all metrics
343    pub async fn clear_metrics(&self) {
344        let mut metrics = self.metrics.write().await;
345        let mut counters = self.counters.write().await;
346        let mut gauges = self.gauges.write().await;
347        let mut histograms = self.histograms.write().await;
348
349        metrics.clear();
350        counters.clear();
351        gauges.clear();
352        histograms.clear();
353
354        tracing::info!("Telemetry metrics cleared");
355    }
356}
357
358/// Global telemetry manager instance
359static TELEMETRY_MANAGER: tokio::sync::OnceCell<TelemetryManager> =
360    tokio::sync::OnceCell::const_new();
361static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
362
363/// Get the global telemetry manager
364pub async fn get_telemetry_manager() -> &'static TelemetryManager {
365    TELEMETRY_MANAGER
366        .get_or_init(|| async { TelemetryManager::new(100000) })
367        .await
368}
369
370/// Convenience function to start a timer
371pub fn start_timer(name: &str) -> Timer {
372    Timer::new(name.to_string())
373}
374
375/// Initialize the global tracing subscriber with optional OTLP trace export.
376///
377/// Call this exactly once from the process entrypoint. When `trace_export` is
378/// enabled, spans emitted through `tracing` are exported to the configured OTLP
379/// collector while still being written to the local log output.
380pub fn init_tracing_subscriber<W>(
381    filter: EnvFilter,
382    writer: W,
383    with_target: bool,
384    trace_export: Option<TraceExportConfig>,
385) -> Result<(), AppError>
386where
387    W: for<'writer> MakeWriter<'writer> + Send + Sync + 'static,
388{
389    let fmt_layer = tracing_subscriber::fmt::layer()
390        .with_writer(writer)
391        .with_target(with_target);
392
393    if let Some(config) = trace_export.filter(|cfg| cfg.enabled) {
394        let tracer = build_otlp_tracer(&config)?;
395        tracing_subscriber::registry()
396            .with(filter)
397            .with(fmt_layer)
398            .with(tracing_opentelemetry::layer().with_tracer(tracer))
399            .try_init()
400            .map_err(|error| {
401                AppError::Config(format!("failed to initialize tracing subscriber: {error}"))
402            })?;
403        tracing::info!(
404            protocol = config.protocol.as_str(),
405            endpoint = %config.endpoint,
406            service_name = %config.service_name,
407            "OTLP trace export enabled"
408        );
409    } else {
410        tracing_subscriber::registry()
411            .with(filter)
412            .with(fmt_layer)
413            .try_init()
414            .map_err(|error| {
415                AppError::Config(format!("failed to initialize tracing subscriber: {error}"))
416            })?;
417    }
418
419    Ok(())
420}
421
422/// Flush and shut down the global tracer provider.
423pub fn shutdown_tracing() {
424    if let Some(tracer_provider) = TRACER_PROVIDER.get()
425        && let Err(error) = tracer_provider.shutdown()
426    {
427        eprintln!("failed to shut down tracer provider: {error}");
428    }
429}
430
431/// Convenience function to increment a counter
432pub async fn increment_counter(name: &str, value: f64) {
433    let telemetry = get_telemetry_manager().await;
434    telemetry
435        .increment_counter(name, value, HashMap::new())
436        .await;
437}
438
439/// Convenience function to set a gauge
440pub async fn set_gauge(name: &str, value: f64) {
441    let telemetry = get_telemetry_manager().await;
442    telemetry.set_gauge(name, value, HashMap::new()).await;
443}
444
445/// Convenience function to record a histogram value
446pub async fn record_histogram(name: &str, value: f64) {
447    let telemetry = get_telemetry_manager().await;
448    telemetry
449        .record_histogram(name, value, HashMap::new())
450        .await;
451}
452
453fn build_otlp_tracer(
454    config: &TraceExportConfig,
455) -> Result<opentelemetry_sdk::trace::Tracer, AppError> {
456    let exporter = match config.protocol {
457        TraceExportProtocol::Http => SpanExporter::builder()
458            .with_http()
459            .with_protocol(Protocol::HttpBinary)
460            .with_endpoint(config.endpoint.clone())
461            .build(),
462        TraceExportProtocol::Grpc => SpanExporter::builder()
463            .with_tonic()
464            .with_endpoint(config.endpoint.clone())
465            .build(),
466    }
467    .map_err(|error| AppError::Config(format!("failed to build OTLP trace exporter: {error}")))?;
468
469    let tracer_provider = SdkTracerProvider::builder()
470        .with_batch_exporter(exporter)
471        .with_resource(
472            Resource::builder()
473                .with_service_name(config.service_name.clone())
474                .with_schema_url(
475                    [KeyValue::new(
476                        "service.version",
477                        config.service_version.clone(),
478                    )],
479                    OTEL_SEMCONV_SCHEMA_URL,
480                )
481                .build(),
482        )
483        .build();
484
485    let tracer = tracer_provider.tracer_with_scope(
486        InstrumentationScope::builder(config.service_name.clone())
487            .with_version(config.service_version.clone())
488            .with_schema_url(OTEL_SEMCONV_SCHEMA_URL)
489            .build(),
490    );
491    let _ = TRACER_PROVIDER.set(tracer_provider.clone());
492    global::set_tracer_provider(tracer_provider);
493
494    Ok(tracer)
495}