Skip to content

Commit

Permalink
feat: digest pipeline processor (#5323)
Browse files Browse the repository at this point in the history
* feat: basic impl

Signed-off-by: Ruihang Xia <[email protected]>

* add document

Signed-off-by: Ruihang Xia <[email protected]>

* apply code review comments

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

* follow the naming master

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jan 16, 2025
1 parent ccd2b06 commit 86bd541
Show file tree
Hide file tree
Showing 5 changed files with 511 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Wrong digest pattern: {pattern}"))]
DigestPatternInvalid {
pattern: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Url decoding error"))]
UrlEncodingDecode {
#[snafu(source)]
Expand Down
8 changes: 7 additions & 1 deletion src/pipeline/src/etl/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;

use snafu::OptionExt;
Expand Down Expand Up @@ -218,6 +218,12 @@ impl Deref for Fields {
}
}

impl DerefMut for Fields {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl IntoIterator for Fields {
type Item = Field;
type IntoIter = std::vec::IntoIter<Field>;
Expand Down
7 changes: 7 additions & 0 deletions src/pipeline/src/etl/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod cmcd;
pub mod csv;
pub mod date;
pub mod decolorize;
pub mod digest;
pub mod dissect;
pub mod epoch;
pub mod gsub;
Expand All @@ -31,6 +32,7 @@ use cmcd::{CmcdProcessor, CmcdProcessorBuilder};
use csv::{CsvProcessor, CsvProcessorBuilder};
use date::{DateProcessor, DateProcessorBuilder};
use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder};
use digest::{DigestProcessor, DigestProcessorBuilder};
use dissect::{DissectProcessor, DissectProcessorBuilder};
use enum_dispatch::enum_dispatch;
use epoch::{EpochProcessor, EpochProcessorBuilder};
Expand Down Expand Up @@ -97,6 +99,7 @@ pub enum ProcessorKind {
Date(DateProcessor),
JsonPath(JsonPathProcessor),
Decolorize(DecolorizeProcessor),
Digest(DigestProcessor),
}

/// ProcessorBuilder trait defines the interface for all processor builders
Expand Down Expand Up @@ -127,6 +130,7 @@ pub enum ProcessorBuilders {
Date(DateProcessorBuilder),
JsonPath(JsonPathProcessorBuilder),
Decolorize(DecolorizeProcessorBuilder),
Digest(DigestProcessorBuilder),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -277,6 +281,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
decolorize::PROCESSOR_DECOLORIZE => {
ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?)
}
digest::PROCESSOR_DIGEST => {
ProcessorBuilders::Digest(DigestProcessorBuilder::try_from(value)?)
}
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

Expand Down
Loading

0 comments on commit 86bd541

Please sign in to comment.