diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index e280583106b1..87743abaa394 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -363,6 +363,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Wrong digest pattern: {pattern}"))] + DigestPatternInvalid { + pattern: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Url decoding error"))] UrlEncodingDecode { #[snafu(source)] diff --git a/src/pipeline/src/etl/field.rs b/src/pipeline/src/etl/field.rs index 84d94040b541..10fa681f236c 100644 --- a/src/pipeline/src/etl/field.rs +++ b/src/pipeline/src/etl/field.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use std::str::FromStr; use snafu::OptionExt; @@ -218,6 +218,12 @@ impl Deref for Fields { } } +impl DerefMut for Fields { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + impl IntoIterator for Fields { type Item = Field; type IntoIter = std::vec::IntoIter; diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index f176234cc43a..bf37f1f8ce7f 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -16,6 +16,7 @@ pub mod cmcd; pub mod csv; pub mod date; pub mod decolorize; +pub mod digest; pub mod dissect; pub mod epoch; pub mod gsub; @@ -31,6 +32,7 @@ use cmcd::{CmcdProcessor, CmcdProcessorBuilder}; use csv::{CsvProcessor, CsvProcessorBuilder}; use date::{DateProcessor, DateProcessorBuilder}; use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder}; +use digest::{DigestProcessor, DigestProcessorBuilder}; use dissect::{DissectProcessor, DissectProcessorBuilder}; use enum_dispatch::enum_dispatch; use epoch::{EpochProcessor, EpochProcessorBuilder}; @@ -97,6 +99,7 @@ pub enum ProcessorKind { Date(DateProcessor), JsonPath(JsonPathProcessor), Decolorize(DecolorizeProcessor), + Digest(DigestProcessor), } /// ProcessorBuilder trait defines the interface for all processor builders @@ -127,6 +130,7 @@ pub enum ProcessorBuilders { Date(DateProcessorBuilder), JsonPath(JsonPathProcessorBuilder), Decolorize(DecolorizeProcessorBuilder), + Digest(DigestProcessorBuilder), } #[derive(Debug, Default)] @@ -277,6 +281,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result { decolorize::PROCESSOR_DECOLORIZE => { ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?) } + digest::PROCESSOR_DIGEST => { + ProcessorBuilders::Digest(DigestProcessorBuilder::try_from(value)?) + } _ => return UnsupportedProcessorSnafu { processor: str_key }.fail(), }; diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs new file mode 100644 index 000000000000..29054365ad03 --- /dev/null +++ b/src/pipeline/src/etl/processor/digest.rs @@ -0,0 +1,428 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Digest the input string by removing certain patterns. +//! +//! This processor can help to extract useful information from a string by removing certain patterns, +//! which is often a variable from the log message. Digested fields are stored in a new field with the +//! `_digest` suffix. And can be used for further processing or analysis like template occurrences count +//! or similarity analysis. + +use std::borrow::Cow; + +use ahash::HashSet; +use regex::Regex; +use snafu::OptionExt; + +use crate::etl::error::{ + Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, +}; +use crate::etl::field::{Fields, OneInputOneOutputField}; +use crate::etl::processor::{ + yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME, + FIELD_NAME, IGNORE_MISSING_NAME, +}; +use crate::etl::value::Value; +use crate::etl_error::DigestPatternInvalidSnafu; + +pub(crate) const PROCESSOR_DIGEST: &str = "digest"; + +const PRESETS_PATTERNS_NAME: &str = "presets"; +const REGEX_PATTERNS_NAME: &str = "regex"; + +enum PresetPattern { + Numbers, + Quoted, + Bracketed, + Uuid, + Ip, +} + +impl std::fmt::Display for PresetPattern { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + PresetPattern::Numbers => write!(f, "numbers"), + PresetPattern::Quoted => write!(f, "quoted"), + PresetPattern::Bracketed => write!(f, "bracketed"), + PresetPattern::Uuid => write!(f, "uuid"), + PresetPattern::Ip => write!(f, "ip"), + } + } +} + +impl std::str::FromStr for PresetPattern { + type Err = Error; + + fn from_str(pattern: &str) -> Result { + match pattern { + "numbers" => Ok(PresetPattern::Numbers), + "quoted" => Ok(PresetPattern::Quoted), + "bracketed" => Ok(PresetPattern::Bracketed), + "uuid" => Ok(PresetPattern::Uuid), + "ip" => Ok(PresetPattern::Ip), + _ => DigestPatternInvalidSnafu { pattern }.fail(), + } + } +} + +impl PresetPattern { + fn regex(&self) -> Regex { + match self { + PresetPattern::Numbers => Regex::new(r"\d+").unwrap(), + PresetPattern::Quoted => Regex::new(r#"["'“”‘’][^"'“”‘’]*["'“”‘’]"#).unwrap(), + PresetPattern::Bracketed => Regex::new(r#"[({\[<「『【〔[{〈《][^(){}\[\]<>「」『』【】〔〕[]{}〈〉《》]*[)}\]>」』】〕]}〉》]"#).unwrap(), + PresetPattern::Uuid => Regex::new(r"\b[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12}\b").unwrap(), + PresetPattern::Ip => Regex::new(r"((\d{1,3}\.){3}\d{1,3}(:\d+)?|(\[[0-9a-fA-F:]+\])(:\d+)?)").unwrap(), + } + } +} + +#[derive(Debug, Default)] +pub struct DigestProcessorBuilder { + fields: Fields, + patterns: Vec, + ignore_missing: bool, +} + +impl ProcessorBuilder for DigestProcessorBuilder { + fn output_keys(&self) -> HashSet<&str> { + self.fields + .iter() + .map(|f| f.target_or_input_field()) + .collect() + } + + fn input_keys(&self) -> HashSet<&str> { + self.fields.iter().map(|f| f.input_field()).collect() + } + + fn build(self, intermediate_keys: &[String]) -> Result { + self.build(intermediate_keys).map(ProcessorKind::Digest) + } +} + +impl DigestProcessorBuilder { + fn build(self, intermediate_keys: &[String]) -> Result { + let mut real_fields = Vec::with_capacity(self.fields.len()); + for field in self.fields.into_iter() { + let input = OneInputOneOutputField::build( + "digest", + intermediate_keys, + field.input_field(), + field.target_or_input_field(), + )?; + real_fields.push(input); + } + Ok(DigestProcessor { + fields: real_fields, + ignore_missing: self.ignore_missing, + patterns: self.patterns, + }) + } +} + +/// Computes a digest (hash) of the input string. +#[derive(Debug, Default)] +pub struct DigestProcessor { + fields: Vec, + ignore_missing: bool, + patterns: Vec, +} + +impl DigestProcessor { + fn remove_quoted_content(&self, val: &str) -> String { + let re = Regex::new(r#""[^"]*""#).unwrap(); + re.replace_all(val, "").to_string() + } + + fn process_string(&self, val: &str) -> Result { + let mut input = Cow::from(val); + for pattern in &self.patterns { + if let Cow::Owned(new_string) = pattern.replace_all(&input, "") { + input = Cow::Owned(new_string); + } + } + + Ok(Value::String(input.into_owned())) + } + + fn process(&self, val: &Value) -> Result { + match val { + Value::String(val) => self.process_string(val), + _ => ProcessorExpectStringSnafu { + processor: PROCESSOR_DIGEST, + v: val.clone(), + } + .fail(), + } + } +} + +impl TryFrom<&yaml_rust::yaml::Hash> for DigestProcessorBuilder { + type Error = Error; + + fn try_from(value: &yaml_rust::yaml::Hash) -> Result { + let mut fields = Fields::default(); + let mut ignore_missing = false; + let mut patterns = Vec::new(); + + for (k, v) in value.iter() { + let key = k + .as_str() + .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?; + + match key { + FIELD_NAME => { + fields = Fields::one(yaml_new_field(v, FIELD_NAME)?); + } + FIELDS_NAME => { + fields = yaml_new_fields(v, FIELDS_NAME)?; + } + IGNORE_MISSING_NAME => { + ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?; + } + PRESETS_PATTERNS_NAME => { + let preset_patterns: Vec = v + .as_vec() + .with_context(|| DigestPatternInvalidSnafu { + pattern: key.to_string(), + })? + .iter() + .map(|p| p.as_str().unwrap().to_string()) + .collect(); + for pattern in preset_patterns { + let preset_pattern = pattern.parse::()?; + let regex = preset_pattern.regex(); + patterns.push(regex); + } + } + REGEX_PATTERNS_NAME => { + let regex_patterns: Vec = v + .as_vec() + .with_context(|| DigestPatternInvalidSnafu { + pattern: key.to_string(), + })? + .iter() + .map(|p| p.as_str().unwrap().to_string()) + .collect(); + for pattern in regex_patterns { + let regex = Regex::new(&pattern).unwrap(); + patterns.push(regex); + } + } + _ => {} + } + } + + for field in fields.iter_mut() { + field.target_field = Some(format!("{}_digest", field.input_field())); + } + + Ok(DigestProcessorBuilder { + fields, + patterns, + ignore_missing, + }) + } +} + +impl crate::etl::processor::Processor for DigestProcessor { + fn kind(&self) -> &str { + PROCESSOR_DIGEST + } + + fn ignore_missing(&self) -> bool { + self.ignore_missing + } + + fn exec_mut(&self, val: &mut Vec) -> Result<()> { + for field in self.fields.iter() { + let index = field.input_index(); + match val.get(index) { + Some(Value::Null) | None => { + if !self.ignore_missing { + return ProcessorMissingFieldSnafu { + processor: self.kind(), + field: field.input_name(), + } + .fail(); + } + } + Some(v) => { + let result = self.process(v)?; + let output_index = field.output_index(); + val[output_index] = result; + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_digest_processor_ip() { + let processor = DigestProcessor { + fields: vec![], + ignore_missing: false, + patterns: vec![PresetPattern::Ip.regex()], + }; + + let input = Value::String("192.168.1.1".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + let input = Value::String("192.168.1.1:8080".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8080".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("not an ip".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("not an ip".to_string())); + } + + #[test] + fn test_digest_processor_uuid() { + let processor = DigestProcessor { + fields: vec![], + ignore_missing: false, + patterns: vec![PresetPattern::Uuid.regex()], + }; + // UUID v4 + let input = Value::String("123e4567-e89b-12d3-a456-426614174000".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // UUID v1 + let input = Value::String("6ba7b810-9dad-11d1-80b4-00c04fd430c8".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // UUID v5 + let input = Value::String("886313e1-3b8a-5372-9b90-0c9aee199e5d".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // UUID with uppercase letters + let input = Value::String("A987FBC9-4BED-3078-CF07-9141BA07C9F3".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // Negative case + let input = Value::String("not a uuid".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("not a uuid".to_string())); + } + + #[test] + fn test_digest_processor_brackets() { + let processor = DigestProcessor { + fields: vec![], + ignore_missing: false, + patterns: vec![PresetPattern::Bracketed.regex()], + }; + + // Basic brackets + let input = Value::String("[content]".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("(content)".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // Chinese brackets + let input = Value::String("「content」".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("『content』".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("【content】".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // Unmatched/unclosed brackets should not match + let input = Value::String("[content".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("[content".to_string())); + + let input = Value::String("content]".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("content]".to_string())); + + // Bad case + let input = Value::String("[content}".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + // Negative case + let input = Value::String("no brackets".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("no brackets".to_string())); + } + + #[test] + fn test_digest_processor_quotes() { + let processor = DigestProcessor { + fields: vec![], + ignore_missing: false, + patterns: vec![PresetPattern::Quoted.regex()], + }; + + let input = Value::String("\"quoted content\"".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("no quotes".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("no quotes".to_string())); + let input = Value::String("".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + } + + #[test] + fn test_digest_processor_custom_regex() { + let processor = DigestProcessor { + fields: vec![], + ignore_missing: false, + patterns: vec![Regex::new(r"\d+").unwrap()], + }; + + let input = Value::String("12345".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + + let input = Value::String("no digits".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("no digits".to_string())); + let input = Value::String("".to_string()); + let result = processor.process(&input).unwrap(); + assert_eq!(result, Value::String("".to_string())); + } +} diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index de724e1a27d2..cb84e9ad0c8e 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -707,3 +707,66 @@ transform: let expected = StringValue("Success and Error".into()); assert_eq!(expected, r[0]); } + +#[test] +fn test_digest() { + let input_value = serde_json::json!({ + "message": "hello world", + "message_with_ip": "hello 192.168.1.1 world", + "message_with_uuid": "hello 123e4567-e89b-12d3-a456-426614174000 world", + "message_with_quote": "hello 'quoted text' world", + "message_bracket": "hello [bracketed text] world", + "message_with_foobar": "hello foobar world" + }); + + let pipeline_yaml = r#" +processors: + - digest: + fields: + - message + - message_with_ip + - message_with_uuid + - message_with_quote + - message_bracket + - message_with_foobar + presets: + - ip + - uuid + - bracketed + - quoted + regex: + - foobar +transform: + - fields: + - message_with_ip_digest + - message_with_uuid_digest + - message_with_quote_digest + - message_bracket_digest + - message_with_foobar_digest + type: string +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + + let mut status = pipeline.init_intermediate_state(); + pipeline.prepare(input_value, &mut status).unwrap(); + let row = pipeline.exec_mut(&mut status).unwrap(); + + let mut r = row + .values + .into_iter() + .map(|v| v.value_data.unwrap()) + .collect::>(); + r.pop(); // remove the timestamp value + + let expected = vec![ + StringValue("hello world".into()), + StringValue("hello world".into()), + StringValue("hello world".into()), + StringValue("hello world".into()), + StringValue("hello world".into()), + ]; + + assert_eq!(expected, r); +}