diff --git a/CHANGELOG.md b/CHANGELOG.md index e6d8f13..962a534 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add `max_elements` subscription parameter (#185) +- Add an optional Prometheus endpoint that exposes metrics (#190) ## [v0.3.0] diff --git a/Cargo.lock b/Cargo.lock index ca3f11b..9f98841 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -476,6 +486,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -705,6 +730,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -888,13 +919,22 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "foldhash", +] + [[package]] name = "hashlink" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -1006,6 +1046,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.6" @@ -1061,14 +1119,20 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1285,6 +1349,51 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.0", + "metrics", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "miette" version = "7.2.0" @@ -1472,6 +1581,12 @@ dependencies = [ "syn 2.0.70", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.103" @@ -1584,6 +1699,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "postgres-openssl" version = "0.5.0" @@ -1676,6 +1797,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.36.0" @@ -1724,6 +1860,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -1914,6 +2059,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -1963,6 +2121,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1975,6 +2142,29 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b84345e4c9bd703274a082fb80caaa99b7612be48dfaa1dd9266577ec412309d" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.204" @@ -2089,6 +2279,8 @@ dependencies = [ "log", "log-mdc", "log4rs", + "metrics", + "metrics-exporter-prometheus", "mime", "ppp", "quick-xml", @@ -2160,6 +2352,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" diff --git a/common/src/settings.rs b/common/src/settings.rs index a3391cf..c12345e 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -322,7 +322,7 @@ impl Cli { #[serde(deny_unknown_fields)] pub struct FilesOutput { // Time after which an unused file descriptor is closed - files_descriptor_close_timeout: Option + files_descriptor_close_timeout: Option, } impl FilesOutput { @@ -351,7 +351,7 @@ pub struct Outputs { #[serde(default)] files: FilesOutput, #[serde(default)] - kafka: KafkaOutput + kafka: KafkaOutput, } impl Outputs { @@ -368,6 +368,60 @@ impl Outputs { } } +#[derive(Debug, Deserialize, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct Monitoring { + listen_address: String, + listen_port: u16, + http_request_duration_buckets: Option>, + count_input_events_per_machine: Option, + count_input_event_bytes_per_machine: Option, + count_http_request_body_network_size_per_machine: Option, + count_http_request_body_real_size_per_machine: Option, + machines_refresh_interval: Option, +} + +impl Monitoring { + pub fn listen_address(&self) -> &str { + &self.listen_address + } + + pub fn listen_port(&self) -> u16 { + self.listen_port + } + + pub fn http_request_duration_buckets(&self) -> &[f64] { + match &self.http_request_duration_buckets { + Some(bucket) => bucket, + None => &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, + ], + } + } + + pub fn count_input_events_per_machine(&self) -> bool { + self.count_input_events_per_machine.unwrap_or(false) + } + + pub fn count_input_event_bytes_per_machine(&self) -> bool { + self.count_input_event_bytes_per_machine.unwrap_or(false) + } + + pub fn count_http_request_body_network_size_per_machine(&self) -> bool { + self.count_http_request_body_network_size_per_machine + .unwrap_or(false) + } + + pub fn count_http_request_body_real_size_per_machine(&self) -> bool { + self.count_http_request_body_real_size_per_machine + .unwrap_or(false) + } + + pub fn machines_refresh_interval(&self) -> u64 { + self.machines_refresh_interval.unwrap_or(30) + } +} + #[derive(Debug, Deserialize, Clone)] #[serde(deny_unknown_fields)] pub struct Settings { @@ -381,6 +435,8 @@ pub struct Settings { cli: Cli, #[serde(default)] outputs: Outputs, + #[serde(default)] + monitoring: Option, } impl std::str::FromStr for Settings { @@ -422,6 +478,10 @@ impl Settings { pub fn outputs(&self) -> &Outputs { &self.outputs } + + pub fn monitoring(&self) -> Option<&Monitoring> { + self.monitoring.as_ref() + } } #[cfg(test)] @@ -484,9 +544,12 @@ mod tests { assert!(s.logging().verbosity().is_none()); assert!(s.logging().access_logs().is_none()); assert_eq!(s.logging().server_logs(), LoggingType::Stderr); + assert_eq!(s.server().tcp_keepalive_time(), 3600); assert_eq!(s.server().tcp_keepalive_intvl().unwrap(), 1); assert_eq!(s.server().tcp_keepalive_probes().unwrap(), 10); + + assert!(s.monitoring().is_none()); } const CONFIG_TLS_POSTGRES: &str = r#" @@ -513,6 +576,10 @@ mod tests { server_certificate = "/etc/server_certificate.pem" server_private_key = "/etc/server_private_key.pem" ca_certificate = "/etc/ca_certificate.pem" + + [monitoring] + listen_address = "127.0.0.1" + listen_port = 9090 "#; #[test] @@ -555,13 +622,49 @@ mod tests { assert_eq!(s.logging().server_logs(), LoggingType::Stdout); assert_eq!(s.logging().server_logs_pattern().unwrap(), "toto"); assert_eq!(s.logging().access_logs_pattern(), "tutu"); + assert_eq!(s.server().tcp_keepalive_time(), 7200); assert!(s.server().tcp_keepalive_intvl().is_none()); assert!(s.server().tcp_keepalive_probes().is_none()); + assert_eq!(s.cli().read_only_subscriptions(), false); + assert_eq!(s.outputs().garbage_collect_interval(), 600); assert_eq!(s.outputs().files().files_descriptor_close_timeout(), 600); assert!(s.outputs().kafka().options().is_empty()); + + assert!(s.monitoring().is_some()); + assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); + assert_eq!(s.monitoring().unwrap().listen_port(), 9090); + assert_eq!( + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), + false + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_network_size_per_machine(), + false + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_real_size_per_machine(), + false + ); + assert_eq!( + s.monitoring().unwrap().count_input_events_per_machine(), + false + ); + assert_eq!( + s.monitoring().unwrap().http_request_duration_buckets(), + &[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,] + ); + assert_eq!( + s.monitoring().unwrap().machines_refresh_interval(), 30 + ); } const CONFIG_TLS_POSTGRES_WITH_CLI: &str = r#" @@ -590,12 +693,56 @@ mod tests { [cli] read_only_subscriptions = true + + [monitoring] + listen_address = "127.0.0.1" + listen_port = 9090 + http_request_duration_buckets = [0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] + count_input_event_bytes_per_machine = true + count_http_request_body_network_size_per_machine = true + count_http_request_body_real_size_per_machine = true + count_input_events_per_machine = true + machines_refresh_interval = 10 "#; #[test] fn test_settings_tls_postgres_with_cli() { let s = Settings::from_str(CONFIG_TLS_POSTGRES_WITH_CLI).unwrap(); assert_eq!(s.cli().read_only_subscriptions(), true); + + assert!(s.monitoring().is_some()); + assert_eq!(s.monitoring().unwrap().listen_address(), "127.0.0.1"); + assert_eq!(s.monitoring().unwrap().listen_port(), 9090); + assert_eq!( + s.monitoring() + .unwrap() + .count_input_event_bytes_per_machine(), + true + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_network_size_per_machine(), + true + ); + assert_eq!( + s.monitoring() + .unwrap() + .count_http_request_body_real_size_per_machine(), + true + ); + assert_eq!( + s.monitoring().unwrap().count_input_events_per_machine(), + true + ); + assert_eq!( + s.monitoring().unwrap().http_request_duration_buckets(), + &[0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0] + ); + assert_eq!( + s.monitoring().unwrap().machines_refresh_interval(), + 10 + ); } const CONFIG_TLS_POSTGRES_WITH_OUTPUTS: &str = r#" diff --git a/common/src/subscription.rs b/common/src/subscription.rs index 81d00ad..6f97441 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; use log::{info, warn}; use serde::{Deserialize, Serialize}; -use strum::{AsRefStr, EnumString, VariantNames}; +use strum::{AsRefStr, EnumString, IntoStaticStr, VariantNames}; use uuid::Uuid; use crate::utils::VersionHasher; @@ -96,12 +96,8 @@ pub struct FilesConfiguration { } impl FilesConfiguration { - pub fn new( - path: String, - ) -> Self { - Self { - path - } + pub fn new(path: String) -> Self { + Self { path } } pub fn path(&self) -> &str { @@ -182,7 +178,17 @@ impl Display for SubscriptionOutput { } } #[derive( - Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash, VariantNames, AsRefStr, EnumString, + Debug, + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + Hash, + VariantNames, + AsRefStr, + EnumString, + IntoStaticStr, )] #[strum(serialize_all = "snake_case", ascii_case_insensitive)] pub enum SubscriptionOutputFormat { @@ -683,8 +689,8 @@ impl SubscriptionData { self } - /// Set the subscription's max elements. - pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { self.parameters.max_elements = max_elements; self.update_internal_version(); self @@ -907,6 +913,7 @@ impl SubscriptionStatsCounters { } } +#[derive(IntoStaticStr)] pub enum SubscriptionMachineState { Alive, Active, diff --git a/doc/monitoring.md b/doc/monitoring.md index a954fa7..30b0215 100644 --- a/doc/monitoring.md +++ b/doc/monitoring.md @@ -28,3 +28,32 @@ $ openwec heartbeats -a 192.168.1.0 -s my-test-subscription ``` Two formats are available: `text` (default) and `json` (`--format`). + +## Prometheus-compatible endpoint + +OpenWEC can expose a Prometheus-compatible endpoint with multiple metrics. + +### Configuration + +This feature is **disabled** by default. + +Metrics collection and publication can be enabled in the OpenWEC settings (see `monitoring` section of [openwec.conf.sample.toml](../openwec.conf.sample.toml)). + +### Available metrics + +> [!CAUTION] +> Enabling the `machine` labels may cause a **huge** increase in metric cardinality! This is disabled by default. + +| **Metric** | **Type** | **Labels** | **Description** | +|---|---|---|---| +| `openwec_input_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total number of events received by openwec | +| `openwec_input_event_bytes_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total size of all events received by openwec | +| `openwec_input_messages_total` | `Counter` | `action` (one of `"enumerate"`, `"heartbeat"`, `"events"`) | The total number of messages received by openwec | +| `openwec_input_event_parsing_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `type` | The total number of event parsing failures | +| `openwec_http_requests_total` | `Counter` | `uri`, `code` | The total number of HTTP requests handled by openwec | +| `openwec_http_request_duration_seconds` | `Histogram` | `uri` | Histogram of response duration for HTTP requests | +| `openwec_http_request_body_network_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec | +| `openwec_http_request_body_real_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression | +| `openwec_output_driver_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `driver` | The total number of output driver failures | +| `openwec_output_format_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `format` | The total number of output format failures | +| `openwec_machines` | `Gauge` | `subscription_uuid`, `subscription_name`, `state` | The number of machines known by openwec | \ No newline at end of file diff --git a/openwec.conf.sample.toml b/openwec.conf.sample.toml index 5f6d4d8..366bfc4 100644 --- a/openwec.conf.sample.toml +++ b/openwec.conf.sample.toml @@ -302,3 +302,49 @@ # - If you configure Kafka options in an output, a dedicated Kafka client will be # used for that output regardless of this setting. # options = {} + +########################## +## Monitoring settings ## +########################## + +# OpenWEC can expose internal metrics on a Prometheus-compatible endpoint. +# Monitoring is disabled by default. +# You can enable it by uncommenting the [monitoring] section. + +# [monitoring] + +# [Required] +# Listen address of the Prometheus-compatible endpoint +# listen_address = + +# [Required] +# Listen port of the Prometheus-compatible endpoint +# listen_port = + +# [Optional] +# The refresh interval of "openwec_machines" gauge +# machines_refresh_interval = 30 + +# [Optional] +# Request duration buckets (in seconds) used by the "openwec_http_request_duration_seconds" histogram +# http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] + +# [Optional] +# If set, a "machine" label will be added to the "openwec_input_events_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_input_events_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "openwec_input_event_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_input_event_bytes_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "openwec_http_request_body_network_size_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_http_request_body_network_size_per_machine = false + +# [Optional] +# If set, a "machine" label will be added to the "openwec_http_request_body_real_size_bytes_total" metric +# Warning: this may cause a HUGE increase in metric cardinality +# count_http_request_body_real_size_per_machine = false diff --git a/server/Cargo.toml b/server/Cargo.toml index 532022a..3740f63 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -52,3 +52,5 @@ ppp = "2.2.0" tokio-rustls = "0.26.0" strum = { version = "0.26.1", features = ["derive"] } leon = "3.0.1" +metrics = "0.24.0" +metrics-exporter-prometheus = { version = "0.16.0", features = ["http-listener"] } \ No newline at end of file diff --git a/server/src/event.rs b/server/src/event.rs index 8df6632..d1bf4ff 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -4,6 +4,7 @@ use log::{debug, info, trace, warn}; use roxmltree::{Document, Error, Node}; use serde::Serialize; use std::{collections::HashMap, fmt::Display, net::SocketAddr, sync::Arc}; +use strum::IntoStaticStr; use crate::subscription::Subscription; @@ -47,7 +48,7 @@ pub enum DataType { Unknown, } -#[derive(Debug, Clone, Default, Eq, PartialEq)] +#[derive(Debug, Clone, Default, Eq, PartialEq, IntoStaticStr)] pub enum ErrorType { /// Initial XML parsing failed but Raw content could be recovered RawContentRecovered(String), @@ -65,9 +66,9 @@ impl Display for ErrorType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ErrorType::RawContentRecovered(message) => write!(f, "{}", message), - ErrorType::FailedToRecoverRawContent(message ) => write!(f, "{}", message), - ErrorType::Unrecoverable(message ) => write!(f, "{}", message), - ErrorType::FailedToFeedEvent (message ) => write!(f, "{}", message), + ErrorType::FailedToRecoverRawContent(message) => write!(f, "{}", message), + ErrorType::Unrecoverable(message) => write!(f, "{}", message), + ErrorType::FailedToFeedEvent(message) => write!(f, "{}", message), ErrorType::Unknown => write!(f, "Unknown error"), } } @@ -192,7 +193,7 @@ impl Event { pub fn from_str(content: &str) -> Self { let mut event = Event::default(); - + let doc_parse_attempt = Document::parse(content); match doc_parse_attempt { Ok(doc) => { @@ -257,7 +258,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "LevelName" { debug_data.level_name = node.text().map(str::to_string); } else if node.tag_name().name() == "Component" { - node.text().unwrap_or_default().clone_into(&mut debug_data.component); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.component); } else if node.tag_name().name() == "SubComponent" { debug_data.sub_component = node.text().map(str::to_string); } else if node.tag_name().name() == "FileLine" { @@ -265,7 +268,9 @@ fn parse_debug_data(debug_data_node: &Node) -> Result { } else if node.tag_name().name() == "Function" { debug_data.function = node.text().map(str::to_string); } else if node.tag_name().name() == "Message" { - node.text().unwrap_or_default().clone_into(&mut debug_data.message); + node.text() + .unwrap_or_default() + .clone_into(&mut debug_data.message); } } Ok(DataType::DebugData(debug_data)) @@ -277,9 +282,13 @@ fn parse_processing_error_data(processing_error_data_node: &Node) -> Result) { - self.time_received = time_received; + self.time_received = time_received; } /// Get a reference to the event metadata's addr. @@ -583,16 +592,13 @@ impl EventData { } else { None }; - Self { - raw, - event - } + Self { raw, event } } pub fn raw(&self) -> Arc { self.raw.clone() } - + pub fn event(&self) -> Option<&Event> { self.event.as_ref() } @@ -919,9 +925,7 @@ mod tests { #[test] fn test_4689_parsing() { - let event = Event::from_str( - EVENT_4689, - ); + let event = Event::from_str(EVENT_4689); assert!(event.additional.error.is_none()) } @@ -931,16 +935,17 @@ mod tests { fn test_serialize_malformed_raw_content_recovered() { // Try to serialize a malformed event, and use the recovering strategy to // recover its Raw content - let event = Event::from_str( - RAW_CONTENT_RECOVERED, - ); + let event = Event::from_str(RAW_CONTENT_RECOVERED); let error = event.additional.error.unwrap(); assert_eq!(error.error_type, ErrorType::RawContentRecovered("Failed to parse event XML (the root node was opened but never closed) but Raw content could be recovered.".to_string())); assert_eq!(error.original_content, RAW_CONTENT_RECOVERED); let system = event.system.unwrap(); - assert_eq!(system.provider.name.unwrap(), "Microsoft-Windows-Security-Auditing".to_string()); + assert_eq!( + system.provider.name.unwrap(), + "Microsoft-Windows-Security-Auditing".to_string() + ); assert_eq!(system.event_id, 4798); assert_eq!(system.execution.unwrap().thread_id, 16952); @@ -948,10 +953,16 @@ mod tests { match event.data { DataType::EventData(data) => { - assert_eq!(data.named_data.get("TargetDomainName").unwrap(), "xxxxx_xps"); - assert_eq!(data.named_data.get("TargetSid").unwrap(), "S-1-5-21-1604529354-1295832394-4197355770-1001"); - }, - _ => panic!("Wrong event data type") + assert_eq!( + data.named_data.get("TargetDomainName").unwrap(), + "xxxxx_xps" + ); + assert_eq!( + data.named_data.get("TargetSid").unwrap(), + "S-1-5-21-1604529354-1295832394-4197355770-1001" + ); + } + _ => panic!("Wrong event data type"), }; } @@ -960,20 +971,23 @@ mod tests { #[test] fn test_serialize_malformed_unrecoverable_1() { // Try to serialize an event for which there is no recovering strategy - let event = Event::from_str( - UNRECOVERABLE_1, - ); + let event = Event::from_str(UNRECOVERABLE_1); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: the root node was opened but never closed".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: the root node was opened but never closed".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_1); } @@ -983,20 +997,23 @@ mod tests { fn test_serialize_malformed_unrecoverable_2() { // Try to serialize a malformed event for which no recovery // is possible. - let event = Event::from_str( - UNRECOVERABLE_2, - ); + let event = Event::from_str(UNRECOVERABLE_2); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::Unrecoverable("Failed to parse event XML: unexpected end of stream".to_string())); + assert_eq!( + error.error_type, + ErrorType::Unrecoverable( + "Failed to parse event XML: unexpected end of stream".to_string() + ) + ); assert_eq!(error.original_content, UNRECOVERABLE_2); } @@ -1006,16 +1023,14 @@ mod tests { fn test_serialize_failed_to_recover() { // Try to serialize a malformed event for which the recovering strategy can // not succeed - let event = Event::from_str( - FAILED_TO_RECOVER_RAW_CONTENT, - ); + let event = Event::from_str(FAILED_TO_RECOVER_RAW_CONTENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); @@ -1029,20 +1044,23 @@ mod tests { fn test_serialize_malformed_failed_to_feed_event() { // Try to serialize a malformed event for which the recovering strategy can // not succeed because is invalid. - let event = Event::from_str( - FAILED_TO_FEED_EVENT, - ); + let event = Event::from_str(FAILED_TO_FEED_EVENT); assert!(event.additional.error.is_some()); assert!(event.system.is_none()); assert!(event.rendering_info.is_none()); match event.data { DataType::Unknown => (), - _ => panic!("Wrong event data type") + _ => panic!("Wrong event data type"), }; let error = event.additional.error.unwrap(); - assert_eq!(error.error_type, ErrorType::FailedToFeedEvent("Could not feed event from document: Parsing failure in System".to_string())); + assert_eq!( + error.error_type, + ErrorType::FailedToFeedEvent( + "Could not feed event from document: Parsing failure in System".to_string() + ) + ); assert_eq!(error.original_content, FAILED_TO_FEED_EVENT); } -} \ No newline at end of file +} diff --git a/server/src/lib.rs b/server/src/lib.rs index d691b5d..bda3f86 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ mod heartbeat; mod kerberos; mod logging; mod logic; +mod monitoring; mod multipart; mod output; mod proxy_protocol; @@ -19,7 +20,7 @@ mod tls; use anyhow::{anyhow, bail, Context, Result}; use common::database::{db_from_settings, schema_is_up_to_date, Db}; use common::encoding::decode_utf16le; -use common::settings::{Authentication, Kerberos, Tls}; +use common::settings::{Authentication, Kerberos, Monitoring, Tls}; use common::settings::{Collector, Server as ServerSettings, Settings}; use core::pin::Pin; use futures::Future; @@ -38,6 +39,12 @@ use hyper_util::rt::TokioIo; use kerberos::AuthenticationError; use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; +use metrics::{counter, histogram}; +use monitoring::{ + HTTP_REQUESTS_COUNTER, HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_STATUS_CODE, HTTP_REQUEST_URI, MACHINE, +}; use quick_xml::writer::Writer; use soap::Serializable; use socket2::{SockRef, TcpKeepalive}; @@ -89,6 +96,8 @@ pub struct RequestData { principal: String, remote_addr: SocketAddr, category: RequestCategory, + uri: String, + method: String, } impl RequestData { @@ -97,6 +106,8 @@ impl RequestData { principal: principal.to_owned(), remote_addr: remote_addr.to_owned(), category: RequestCategory::try_from(req)?, + method: req.method().to_string(), + uri: req.uri().to_string(), }) } @@ -114,6 +125,14 @@ impl RequestData { pub fn category(&self) -> &RequestCategory { &self.category } + + pub fn uri(&self) -> &str { + &self.uri + } + + pub fn method(&self) -> &str { + &self.method + } } #[derive(Debug, Clone)] @@ -137,7 +156,9 @@ fn full>(chunk: T) -> BoxBody { async fn get_request_payload( collector: &Collector, + monitoring: &Option, auth_ctx: &AuthenticationContext, + request_data: &RequestData, req: Request, ) -> Result> { let (parts, body) = req.into_parts(); @@ -168,6 +189,21 @@ async fn get_request_payload( return Ok(None); } + let http_request_body_network_size_bytes_counter = match monitoring { + Some(monitoring_conf) + if monitoring_conf.count_http_request_body_network_size_per_machine() => + { + counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_URI => request_data.uri().to_string(), + MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + HTTP_REQUEST_URI => request_data.uri().to_string()) + } + }; + http_request_body_network_size_bytes_counter.increment(data.len().try_into()?); + let message = match auth_ctx { AuthenticationContext::Tls(_, _) => tls::get_request_payload(parts, data).await?, AuthenticationContext::Kerberos(conn_state) => { @@ -177,6 +213,21 @@ async fn get_request_payload( match message { Some(bytes) => { + let http_request_body_real_size_bytes_counter = match monitoring { + Some(monitoring_conf) + if monitoring_conf.count_http_request_body_real_size_per_machine() => + { + counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUEST_URI => request_data.uri().to_string(), + MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + HTTP_REQUEST_URI => request_data.uri().to_string()) + } + }; + http_request_body_real_size_bytes_counter.increment(bytes.len().try_into()?); + // Spawn a blocking task to decode utf16 tokio::task::spawn_blocking(|| Ok(Some(decode_utf16le(bytes)?))).await? } @@ -287,6 +338,7 @@ async fn authenticate( async fn handle_payload( server: &ServerSettings, collector: &Collector, + monitoring: &Option, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -307,6 +359,7 @@ async fn handle_payload( let response = logic::handle_message( server, collector, + monitoring, db, subscriptions, heartbeat_tx, @@ -379,14 +432,23 @@ fn log_response( principal: &str, conn_status: ConnectionStatus, ) { - let duration: f32 = start.elapsed().as_micros() as f32; + let duration = start.elapsed().as_secs_f64(); + + histogram!(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUEST_URI => uri.to_owned()) + .record(duration); + + counter!(HTTP_REQUESTS_COUNTER, + HTTP_REQUEST_STATUS_CODE => status.as_str().to_owned(), + HTTP_REQUEST_URI => uri.to_owned()) + .increment(1); // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); log_mdc::insert("http_method", method); log_mdc::insert("http_uri", uri); - log_mdc::insert("response_time", format!("{:.3}", duration / 1000.0)); + log_mdc::insert("response_time", format!("{:.3}", duration * 1000.0)); log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); log_mdc::insert("principal", principal); @@ -407,6 +469,7 @@ fn build_error_response(status: StatusCode) -> Response, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -474,23 +537,24 @@ async fn handle( }; // Get request payload - let request_payload = match get_request_payload(&collector, &auth_ctx, req).await { - Ok(payload) => payload, - Err(e) => { - error!("Failed to retrieve request payload: {:?}", e); - let status = StatusCode::BAD_REQUEST; - log_response( - &addr, - &method, - &uri, - &start, - status, - &principal, - ConnectionStatus::Alive, - ); - return Ok(build_error_response(status)); - } - }; + let request_payload = + match get_request_payload(&collector, &monitoring, &auth_ctx, &request_data, req).await { + Ok(payload) => payload, + Err(e) => { + error!("Failed to retrieve request payload: {:?}", e); + let status = StatusCode::BAD_REQUEST; + log_response( + &addr, + &method, + &uri, + &start, + status, + &principal, + ConnectionStatus::Alive, + ); + return Ok(build_error_response(status)); + } + }; trace!( "Received payload: {:?}", @@ -521,6 +585,7 @@ async fn handle( let res = handle_payload( &server, &collector, + &monitoring, db, subscriptions, heartbeat_tx, @@ -678,6 +743,7 @@ fn create_kerberos_server( collector_subscriptions: Subscriptions, collector_heartbeat_tx: mpsc::Sender, collector_server_settings: ServerSettings, + monitoring_settings: Option, collector_shutdown_ct: CancellationToken, server_addr: SocketAddr, ) -> Pin> + Send>> { @@ -730,6 +796,7 @@ fn create_kerberos_server( let svc_db = collector_db.clone(); let svc_server_settings = collector_server_settings.clone(); let svc_server_principal = server_principal.clone(); + let svc_monitoring_settings = monitoring_settings.clone(); let subscriptions = collector_subscriptions.clone(); let collector_heartbeat_tx = collector_heartbeat_tx.clone(); @@ -769,6 +836,7 @@ fn create_kerberos_server( handle( svc_server_settings.clone(), collector_settings.clone(), + svc_monitoring_settings.clone(), svc_db.clone(), subscriptions.clone(), collector_heartbeat_tx.clone(), @@ -827,6 +895,7 @@ fn create_tls_server( collector_subscriptions: Subscriptions, collector_heartbeat_tx: mpsc::Sender, collector_server_settings: ServerSettings, + monitoring_settings: Option, collector_shutdown_ct: CancellationToken, server_addr: SocketAddr, ) -> Pin> + Send>> { @@ -875,6 +944,7 @@ fn create_tls_server( let collector_settings = collector_settings.clone(); let svc_db = collector_db.clone(); let svc_server_settings = collector_server_settings.clone(); + let svc_monitoring_settings = monitoring_settings.clone(); let subscriptions = collector_subscriptions.clone(); let collector_heartbeat_tx = collector_heartbeat_tx.clone(); let thumbprint = tls_config.thumbprint.clone(); @@ -945,6 +1015,7 @@ fn create_tls_server( handle( svc_server_settings.clone(), collector_settings.clone(), + svc_monitoring_settings.clone(), svc_db.clone(), subscriptions.clone(), collector_heartbeat_tx.clone(), @@ -1062,6 +1133,10 @@ pub async fn run(settings: Settings, verbosity: u8) { let subscriptions = Arc::new(RwLock::new(HashMap::new())); + if let Some(monitoring_settings) = settings.monitoring() { + monitoring::init(&db, subscriptions.clone(), monitoring_settings).expect("Failed to initialize metrics exporter"); + } + let reload_interval = settings.server().db_sync_interval(); let outputs_settings = settings.outputs().clone(); let update_task_db = db.clone(); @@ -1133,6 +1208,7 @@ pub async fn run(settings: Settings, verbosity: u8) { let collector_heartbeat_tx = heartbeat_tx.clone(); let collector_server_settings = settings.server().clone(); let collector_shutdown_ct = shutdown_ct.clone(); + let collector_monitoring_settings = settings.monitoring().cloned(); // Construct our SocketAddr to listen on... let addr = SocketAddr::from(( @@ -1153,6 +1229,7 @@ pub async fn run(settings: Settings, verbosity: u8) { collector_subscriptions, collector_heartbeat_tx, collector_server_settings, + collector_monitoring_settings, collector_shutdown_ct, addr, )); @@ -1165,6 +1242,7 @@ pub async fn run(settings: Settings, verbosity: u8) { collector_subscriptions, collector_heartbeat_tx, collector_server_settings, + collector_monitoring_settings, collector_shutdown_ct, addr, )); diff --git a/server/src/logic.rs b/server/src/logic.rs index 2595c3d..9c1a89c 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -1,6 +1,13 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, + monitoring::{ + INPUT_EVENTS_COUNTER, INPUT_EVENT_BYTES_COUNTER, INPUT_EVENT_PARSING_FAILURES, + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE, INPUT_MESSAGES_COUNTER, MACHINE, MESSAGES_ACTION, + MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, + OUTPUT_DRIVER, OUTPUT_DRIVER_FAILURES, OUTPUT_FORMAT, OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME, SUBSCRIPTION_UUID, + }, output::get_formatter, soap::{ Body, Header, Message, OptionSetValue, Subscription as SoapSubscription, SubscriptionBody, @@ -12,11 +19,12 @@ use crate::{ }; use common::{ database::Db, - settings::{Collector, Server}, + settings::{Collector, Monitoring, Server}, subscription::{SubscriptionOutputFormat, SubscriptionUuid}, }; use hyper::http::status::StatusCode; use log::{debug, error, warn}; +use metrics::counter; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -26,6 +34,12 @@ use uuid::Uuid; use anyhow::{anyhow, bail, Context, Result}; +#[derive(Debug)] +struct OutputDriverError { + pub driver: String, + pub error: anyhow::Error, +} + pub enum Response { Ok(String, Option), Err(StatusCode), @@ -217,6 +231,8 @@ async fn handle_enumerate( }); } + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE).increment(1); + Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, Some(Body::EnumerateResponse(res_subscriptions)), @@ -283,6 +299,9 @@ async fn handle_heartbeat( ) .await .context("Failed to store heartbeat")?; + + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); + Ok(Response::ok(ACTION_ACK, None)) } @@ -296,7 +315,36 @@ fn get_formatted_events( for raw in events.iter() { // EventData parses the raw event into an Event struct // (once for all formatters). - events_data.push(EventData::new(raw.clone(), need_to_parse_event)) + let event_data = EventData::new(raw.clone(), need_to_parse_event); + + if need_to_parse_event { + // Count failures + match event_data.event() { + Some(event) => { + if let Some(error) = &event.additional.error { + let error_type_str: &'static str = error.error_type.clone().into(); + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => error_type_str) + .increment(1); + warn!("Failed to parse an event: {:?}", error) + } + } + None => { + counter!(INPUT_EVENT_PARSING_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE => "Unknown") + .increment(1); + warn!( + "Event should have been parsed but it was not: {}", + event_data.raw() + ) + } + } + } + events_data.push(event_data) } let mut formatted_events: HashMap>>> = @@ -307,6 +355,14 @@ fn get_formatted_events( for event_data in events_data.iter() { if let Some(str) = formatter.format(metadata, event_data) { content.push(str.clone()) + } else { + let format_str: &'static str = format.into(); + counter!(OUTPUT_FORMAT_FAILURES, + SUBSCRIPTION_NAME => metadata.subscription_name().to_owned(), + SUBSCRIPTION_UUID => metadata.subscription_uuid().to_owned(), + OUTPUT_FORMAT => format_str) + .increment(1); + warn!("Failed to format an event using {}", format_str); } } formatted_events.insert(format.clone(), Arc::new(content)); @@ -316,6 +372,7 @@ fn get_formatted_events( async fn handle_events( server: &Server, + monitoring: &Option, db: &Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -354,6 +411,14 @@ async fn handle_events( return Ok(Response::err(StatusCode::FORBIDDEN)); } + // Retrieve the public version sent by the client, not the one stored in memory + let public_version = if let Some(public_version) = message.header().version() { + public_version + } else { + warn!("Missing subscription version in message events"); + return Ok(Response::err(StatusCode::BAD_REQUEST)); + }; + debug!( "Received {} events from {}:{} ({}) for subscription {} ({})", events.len(), @@ -364,13 +429,42 @@ async fn handle_events( subscription.uuid_string() ); - // Retrieve the public version sent by the client, not the one stored in memory - let public_version = if let Some(public_version) = message.header().version() { - public_version - } else { - warn!("Missing subscription version in message events"); - return Ok(Response::err(StatusCode::BAD_REQUEST)); + counter!(INPUT_MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS).increment(1); + + let events_counter = match monitoring { + Some(monitoring_conf) if monitoring_conf.count_input_events_per_machine() => { + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(INPUT_EVENTS_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) + } + }; + events_counter.increment(events.len().try_into()?); + + let event_size_counter = match monitoring { + Some(monitoring_conf) if monitoring_conf.count_input_event_bytes_per_machine() => { + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + MACHINE => request_data.principal().to_string()) + } + _ => { + counter!(INPUT_EVENT_BYTES_COUNTER, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string()) + } }; + event_size_counter.increment( + events + .iter() + .fold(0, |acc, event| acc + event.len()) + .try_into()?, + ); let metadata = Arc::new(EventMetadata::new( request_data.remote_addr(), @@ -437,6 +531,10 @@ async fn handle_events( output_cloned.describe() ) }) + .map_err(|e| OutputDriverError { + driver: output_cloned.driver(), + error: e, + }) }); } @@ -447,11 +545,21 @@ async fn handle_events( Ok(Ok(())) => (), Ok(Err(err)) => { succeed = false; - warn!("Failed to process output and send event: {:?}", err); + warn!("Failed to process output and send event: {:?}", err.error); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => err.driver.clone()) + .increment(1); } Err(err) => { succeed = false; - warn!("Something bad happened with a process task: {:?}", err) + warn!("Something bad happened with a process task: {:?}", err); + counter!(OUTPUT_DRIVER_FAILURES, + SUBSCRIPTION_NAME => subscription.data().name().to_owned(), + SUBSCRIPTION_UUID => subscription.uuid_string(), + OUTPUT_DRIVER => "Unknown") + .increment(1); } } } @@ -500,6 +608,7 @@ async fn handle_events( pub async fn handle_message( server: &Server, collector: &Collector, + monitoring: &Option, db: Db, subscriptions: Subscriptions, heartbeat_tx: mpsc::Sender, @@ -523,6 +632,7 @@ pub async fn handle_message( } else if action == ACTION_EVENTS { handle_events( server, + monitoring, &db, subscriptions, heartbeat_tx, diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs new file mode 100644 index 0000000..3331b38 --- /dev/null +++ b/server/src/monitoring.rs @@ -0,0 +1,217 @@ +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, + time::{Duration, SystemTime}, +}; + +use anyhow::Result; +use common::{database::Db, settings::Monitoring, subscription::SubscriptionMachineState}; +use log::{debug, info}; +use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit}; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; +use tokio::time; + +use crate::subscription::Subscriptions; + +// input metrics + +pub const INPUT_MESSAGES_COUNTER: &str = "openwec_input_messages_total"; +pub const MESSAGES_ACTION: &str = "action"; +pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; +pub const MESSAGES_ACTION_EVENTS: &str = "events"; +pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; + +pub const INPUT_EVENTS_COUNTER: &str = "openwec_input_events_total"; +pub const SUBSCRIPTION_UUID: &str = "subscription_uuid"; +pub const SUBSCRIPTION_NAME: &str = "subscription_name"; +pub const MACHINE: &str = "machine"; + +pub const INPUT_EVENT_BYTES_COUNTER: &str = "openwec_input_event_bytes_total"; +pub const INPUT_EVENT_PARSING_FAILURES: &str = "openwec_input_event_parsing_failures_total"; +pub const INPUT_EVENT_PARSING_FAILURE_ERROR_TYPE: &str = "type"; + +// http metrics + +pub const HTTP_REQUESTS_COUNTER: &str = "openwec_http_requests_total"; + +pub const HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM: &str = "openwec_http_request_duration_seconds"; +pub const HTTP_REQUEST_URI: &str = "uri"; +pub const HTTP_REQUEST_STATUS_CODE: &str = "code"; + +pub const HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER: &str = + "openwec_http_request_body_network_size_bytes_total"; +pub const HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER: &str = + "openwec_http_request_body_real_size_bytes_total"; + +// output metrics + +pub const OUTPUT_DRIVER_FAILURES: &str = "openwec_output_driver_failures_total"; +pub const OUTPUT_DRIVER: &str = "driver"; +pub const OUTPUT_FORMAT_FAILURES: &str = "openwec_output_format_failures_total"; +pub const OUTPUT_FORMAT: &str = "format"; + +// machines metrics + +pub const MACHINES_GAUGE: &str = "openwec_machines"; +pub const MACHINES_STATE: &str = "state"; + +pub fn init(db: &Db, subscriptions: Subscriptions, settings: &Monitoring) -> Result<()> { + let refresh_interval = settings.machines_refresh_interval(); + let refresh_task_db = db.clone(); + let refresh_task_subscriptions = subscriptions.clone(); + + // Launch a task responsible for refreshing machines gauge + tokio::spawn(async move { + refresh_machines_task( + refresh_task_db, + refresh_task_subscriptions, + refresh_interval, + ) + .await + }); + + let addr = SocketAddr::from(( + IpAddr::from_str(settings.listen_address()) + .expect("Failed to parse monitoring.listen_address"), + settings.listen_port(), + )); + + let builder = PrometheusBuilder::new() + .with_http_listener(addr) + .set_buckets_for_metric( + Matcher::Full(HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM.to_string()), + settings.http_request_duration_buckets(), + )?; + + info!("Starting monitoring server on {}", addr); + + builder.install()?; + + // input + describe_counter!( + INPUT_EVENTS_COUNTER, + Unit::Count, + "The total number of events received by openwec" + ); + describe_counter!( + INPUT_EVENT_BYTES_COUNTER, + Unit::Bytes, + "The total size of all events received by openwec" + ); + describe_counter!( + INPUT_MESSAGES_COUNTER, + Unit::Count, + "The total number of messages received by openwec" + ); + describe_counter!( + INPUT_EVENT_PARSING_FAILURES, + Unit::Count, + "The total number of event parsing failures" + ); + + // http + describe_counter!( + HTTP_REQUESTS_COUNTER, + Unit::Count, + "The total number of HTTP requests handled by openwec" + ); + describe_histogram!( + HTTP_REQUEST_DURATION_SECONDS_HISTOGRAM, + Unit::Seconds, + "Histogram of response duration for HTTP requests" + ); + describe_counter!( + HTTP_REQUEST_BODY_NETWORK_SIZE_BYTES_COUNTER, + Unit::Bytes, + "The total size of all http requests body received by openwec" + ); + describe_counter!( + HTTP_REQUEST_BODY_REAL_SIZE_BYTES_COUNTER, + Unit::Bytes, + "The total size of all http requests body received by openwec after decryption and decompression" + ); + + // output + describe_counter!( + OUTPUT_DRIVER_FAILURES, + Unit::Count, + "The total number of output driver failures" + ); + describe_counter!( + OUTPUT_FORMAT_FAILURES, + Unit::Count, + "The total number of output format failures" + ); + + // machines + describe_gauge!( + MACHINES_GAUGE, + Unit::Count, + "The number of machines known by openwec" + ); + + Ok(()) +} + +async fn refresh_machines_task( + db: Db, + subscriptions: Subscriptions, + refresh_interval: u64, +) -> Result<()> { + info!("Starting refresh machines task for monitoring"); + let mut refresh = time::interval(Duration::from_secs(refresh_interval)); + // We don't want the first tick to complete immediatly + refresh.reset_after(Duration::from_secs(refresh_interval)); + loop { + tokio::select! { + _ = refresh.tick() => { + debug!("Refreshing machines stats for monitoring"); + + // We can't await with the lock on "subscriptions" + // So we first copy all data we need from "subscriptions" + let subscriptions_data = { + let subscriptions_unlocked = subscriptions.read().unwrap(); + let mut subscriptions_data = Vec::with_capacity(subscriptions_unlocked.len()); + for (_, subscription) in subscriptions.read().unwrap().iter() { + subscriptions_data.push((subscription.uuid_string(), subscription.data().name().to_string(), subscription.data().heartbeat_interval())); + } + subscriptions_data + }; + + for (subscription_uuid, subscription_name, heartbeat_interval) in subscriptions_data { + let now: i64 = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() + .try_into()?; + + let stats = db + .get_stats(&subscription_uuid, now - (heartbeat_interval as i64)) + .await?; + + debug!("Update {} values with active={}, alive={}, dead={}", MACHINES_GAUGE, stats.active_machines_count(), stats.alive_machines_count(), stats.dead_machines_count()); + + let alive_str: &'static str = SubscriptionMachineState::Alive.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => alive_str) + .set(stats.alive_machines_count() as f64); + + let active_str: &'static str = SubscriptionMachineState::Active.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => active_str) + .set(stats.active_machines_count() as f64); + + let dead_str: &'static str = SubscriptionMachineState::Dead.into(); + gauge!(MACHINES_GAUGE, + SUBSCRIPTION_NAME => subscription_name.clone(), + SUBSCRIPTION_UUID => subscription_uuid.clone(), + MACHINES_STATE => dead_str) + .set(stats.dead_machines_count() as f64); + } + } + } + } +} diff --git a/server/src/output.rs b/server/src/output.rs index 99a428f..156b741 100644 --- a/server/src/output.rs +++ b/server/src/output.rs @@ -138,6 +138,10 @@ impl Output { ) } + pub fn driver(&self) -> String { + format!("{:?}", self.subscription_output_driver) + } + pub async fn write( &self, metadata: Arc,