Skip to content

Commit

Permalink
Merge pull request #270 from SigmaHQ:pipeline-nesting
Browse files Browse the repository at this point in the history
Nested processing pipelines
  • Loading branch information
thomaspatzke authored Sep 2, 2024
2 parents 73dbcb2 + d515250 commit d52d8b5
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 28 deletions.
18 changes: 14 additions & 4 deletions docs/Processing_Pipelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,10 @@ definitions are available:
"Identifier", "Class"
"field_name_mapping", "FieldMappingTransformation"
"field_name_prefix_mapping", "FieldPrefixMappingTransformation"
"field_name_transform", "FieldFunctionTransformation"
"drop_detection_item", "DropDetectionItemTransformation"
"field_name_suffix", "AddFieldnameSuffixTransformation"
"field_name_prefix", "AddFieldnamePrefixTransformation"
"drop_detection_item", "DropDetectionItemTransformation"
"wildcard_placeholders", "WildcardPlaceholderTransformation"
"value_placeholders", "ValueListPlaceholderTransformation"
"query_expression_placeholders", "QueryExpressionPlaceholderTransformation"
Expand All @@ -278,12 +279,13 @@ definitions are available:
"set_field", "SetFieldTransformation"
"replace_string", "ReplaceStringTransformation"
"map_string", "MapStringTransformation"
"set_state", "SetStateTransformation"
"regex", "RegexTransformation"
"set_value", "SetValueTransformation"
"convert_type", "ConvertTypeTransformation
"set_state", "SetStateTransformation"
"convert_type", "ConvertTypeTransformation"
"rule_failure", "RuleFailureTransformation"
"detection_item_failure", "DetectionItemFailureTransformation"
"nest", "NestedProcessingTransformation"

.. autoclass:: sigma.processing.transformations.FieldMappingTransformation

Expand All @@ -304,9 +306,10 @@ and `cmdline`. For the latter, OR-conditions will be generated to match the valu
This is useful if different data models are used in the same system.

.. autoclass:: sigma.processing.transformations.FieldPrefixMappingTransformation
.. autoclass:: sigma.processing.transformations.FieldFunctionTransformation
.. autoclass:: sigma.processing.transformations.DropDetectionItemTransformation
.. autoclass:: sigma.processing.transformations.AddFieldnameSuffixTransformation
.. autoclass:: sigma.processing.transformations.AddFieldnamePrefixTransformation
.. autoclass:: sigma.processing.transformations.DropDetectionItemTransformation
.. autoclass:: sigma.processing.transformations.WildcardPlaceholderTransformation
.. autoclass:: sigma.processing.transformations.ValueListPlaceholderTransformation
.. autoclass:: sigma.processing.transformations.QueryExpressionPlaceholderTransformation
Expand All @@ -317,6 +320,13 @@ This is useful if different data models are used in the same system.
.. autoclass:: sigma.processing.transformations.SetFieldTransformation
.. autoclass:: sigma.processing.transformations.ReplaceStringTransformation
.. autoclass:: sigma.processing.transformations.MapStringTransformation
.. autoclass:: sigma.processing.transformations.SetStateTransformation
.. autoclass:: sigma.processing.transformations.RegexTransformation
.. autoclass:: sigma.processing.transformations.SetValueTransformation
.. autoclass:: sigma.processing.transformations.ConvertTypeTransformation
.. autoclass:: sigma.processing.transformations.RuleFailureTransformation
.. autoclass:: sigma.processing.transformations.DetectionItemFailureTransformation
.. autoclass:: sigma.processing.transformations.NestedProcessingTransformation

YAML example:

Expand Down
38 changes: 37 additions & 1 deletion sigma/processing/finalization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field
import json
from typing import Any, Dict, List, Literal, Optional

Expand Down Expand Up @@ -91,9 +91,45 @@ def apply(
return self.j2template.render(queries=queries, pipeline=pipeline)


@dataclass
class NestedFinalizer(Finalizer):
"""Apply a list of finalizers to the queries in a nested fashion."""

finalizers: List[Finalizer]
_nested_pipeline: "sigma.processing.pipeline.ProcessingPipeline" = field(
init=False, compare=False, default=None
)

def __post_init__(self):
from sigma.processing.pipeline import (
ProcessingPipeline,
) # TODO: move to top after restructuring code.

self._nested_pipeline = ProcessingPipeline(finalizers=self.finalizers)

@classmethod
def from_dict(cls, d: Dict) -> "NestedFinalizer":
if not "finalizers" in d:
raise SigmaConfigurationError("Nested finalizer requires a 'finalizers' key.")
fs = []
for finalizer in d["finalizers"]:
try:
finalizer_type = finalizer.pop("type")
except KeyError:
raise SigmaConfigurationError("Finalizer type not specified for: " + str(finalizer))
fs.append(finalizers[finalizer_type].from_dict(finalizer))
return cls(finalizers=fs)

def apply(
self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any]
) -> Any:
return self._nested_pipeline.finalize(queries)


finalizers: Dict[str, Finalizer] = {
"concat": ConcatenateQueriesFinalizer,
"json": JSONFinalizer,
"yaml": YAMLFinalizer,
"template": TemplateFinalizer,
"nested": NestedFinalizer,
}
1 change: 0 additions & 1 deletion sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from sigma.correlations import SigmaCorrelationRule
from sigma.processing.finalization import Finalizer, finalizers
from sigma.processing.postprocessing import QueryPostprocessingTransformation
from sigma.processing.tracking import FieldMappingTracking
from sigma.rule import SigmaDetectionItem, SigmaRule
from sigma.processing.transformations import transformations, Transformation
Expand Down
48 changes: 46 additions & 2 deletions sigma/processing/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import re
from typing import Any, Dict, List, Optional, Union
import sigma
from sigma.exceptions import SigmaConfigurationError
import sigma.processing.postprocessing
from sigma.processing.templates import TemplateBase
from sigma.processing.transformations import Transformation
from sigma.rule import SigmaRule
Expand Down Expand Up @@ -32,7 +34,7 @@ def apply(
:return: Transformed query.
:rtype: Any
"""
super().apply(pipeline, rule) # tracking of applied rules and assigning self.pipeline
super().apply(pipeline, rule) # tracking of applied rules


@dataclass
Expand All @@ -55,7 +57,9 @@ def apply(

@dataclass
class QuerySimpleTemplateTransformation(QueryPostprocessingTransformation):
"""Replace query with template that can refer to the following placeholders:
"""
Replace query with template that can refer to the following placeholders:
* query: the postprocessed query.
* rule: the Sigma rule including all its attributes like `rule.title`.
* pipeline: the Sigma processing pipeline where this transformation is applied including all
Expand Down Expand Up @@ -143,10 +147,50 @@ def apply(
return self.re.sub(self.replacement, query)


@dataclass
class NestedQueryPostprocessingTransformation(QueryPostprocessingTransformation):
"""Applies a list of query postprocessing transformations to the query in a nested manner."""

items: List["sigma.processing.pipeline.QueryPostprocessingItem"]
_nested_pipeline: "sigma.processing.pipeline.ProcessingPipeline" = field(
init=False, compare=False, default=None
)

def __post_init__(self):
from sigma.processing.pipeline import (
ProcessingPipeline,
) # TODO: move to top-level after restructuring code

self._nested_pipeline = ProcessingPipeline(postprocessing_items=self.items)

@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "NestedQueryPostprocessingTransformation":
try:
return NestedQueryPostprocessingTransformation(
items=[
sigma.processing.pipeline.QueryPostprocessingItem.from_dict(item)
for item in d["items"]
]
)
except KeyError:
raise SigmaConfigurationError(
"Nested post-processing transformation requires an 'items' key."
)

def apply(
self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: Any
) -> Any:
super().apply(pipeline, rule, query)
query = self._nested_pipeline.postprocess_query(rule, query)
pipeline.applied_ids.update(self._nested_pipeline.applied_ids)
return query


query_postprocessing_transformations = {
"embed": EmbedQueryTransformation,
"simple_template": QuerySimpleTemplateTransformation,
"template": QueryTemplateTransformation,
"json": EmbedQueryInJSONTransformation,
"replace": ReplaceQueryTransformation,
"nest": NestedQueryPostprocessingTransformation,
}
5 changes: 5 additions & 0 deletions sigma/processing/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ def add_mapping(self, source: str, target: Union[str, List[str]]) -> None:
self[source].update(target)
for t in target:
self.target_fields[t].add(source)

def merge(self, other: "FieldMappingTracking") -> None:
"""Merge another FieldMappingTracking into this one."""
for source, target_set in other.items():
self.add_mapping(source, list(target_set))
72 changes: 59 additions & 13 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def apply(
rule: Union[SigmaRule, SigmaCorrelationRule],
) -> None:
"""Apply transformation on Sigma rule."""
self.pipeline: "sigma.processing.pipeline.ProcessingPipeline" = (
self._pipeline: "sigma.processing.pipeline.ProcessingPipeline" = (
pipeline # make pipeline accessible from all further options in class property
)
self.processing_item_applied(rule)
Expand Down Expand Up @@ -120,7 +120,7 @@ def apply_detection(self, detection: SigmaDetection):
else:
if (
self.processing_item is None
or self.processing_item.match_detection_item(self.pipeline, detection_item)
or self.processing_item.match_detection_item(self._pipeline, detection_item)
) and (r := self.apply_detection_item(detection_item)) is not None:
if isinstance(r, SigmaDetectionItem):
r.disable_conversion_to_plain()
Expand Down Expand Up @@ -218,12 +218,12 @@ def apply_detection_item(
match = False
for value in detection_item.value:
if self.processing_item is not None and self.processing_item.match_field_in_value(
self.pipeline, value
self._pipeline, value
):
new_values.extend(
(
SigmaFieldReference(mapped_field)
for mapped_field in self._apply_field_name(self.pipeline, value.field)
for mapped_field in self._apply_field_name(self._pipeline, value.field)
)
)
match = True
Expand Down Expand Up @@ -361,8 +361,8 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem):
super().apply_detection_item(detection_item)
field = detection_item.field
mapping = self.get_mapping(field)
if mapping is not None and self.processing_item.match_field_name(self.pipeline, field):
self.pipeline.field_mappings.add_mapping(field, mapping)
if mapping is not None and self.processing_item.match_field_name(self._pipeline, field):
self._pipeline.field_mappings.add_mapping(field, mapping)
if isinstance(mapping, str): # 1:1 mapping, map field name of detection item directly
detection_item.field = mapping
self.processing_item_applied(detection_item)
Expand Down Expand Up @@ -413,8 +413,8 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem):
super().apply_detection_item(detection_item)
f = detection_item.field
mapping = self._transform_name(f)
if self.processing_item.match_field_name(self.pipeline, f):
self.pipeline.field_mappings.add_mapping(f, mapping)
if self.processing_item.match_field_name(self._pipeline, f):
self._pipeline.field_mappings.add_mapping(f, mapping)
detection_item.field = mapping
self.processing_item_applied(detection_item)

Expand Down Expand Up @@ -463,10 +463,10 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem):
super().apply_detection_item(detection_item)
if type(orig_field := detection_item.field) is str and (
self.processing_item is None
or self.processing_item.match_field_name(self.pipeline, orig_field)
or self.processing_item.match_field_name(self._pipeline, orig_field)
):
detection_item.field += self.suffix
self.pipeline.field_mappings.add_mapping(orig_field, detection_item.field)
self._pipeline.field_mappings.add_mapping(orig_field, detection_item.field)
self.processing_item_applied(detection_item)

def apply_field_name(self, field: str) -> List[str]:
Expand All @@ -485,10 +485,10 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem):
super().apply_detection_item(detection_item)
if type(orig_field := detection_item.field) is str and (
self.processing_item is None
or self.processing_item.match_field_name(self.pipeline, orig_field)
or self.processing_item.match_field_name(self._pipeline, orig_field)
):
detection_item.field = self.prefix + detection_item.field
self.pipeline.field_mappings.add_mapping(orig_field, detection_item.field)
self._pipeline.field_mappings.add_mapping(orig_field, detection_item.field)
self.processing_item_applied(detection_item)

def apply_field_name(self, field: str) -> List[str]:
Expand Down Expand Up @@ -581,7 +581,7 @@ class ValueListPlaceholderTransformation(BasePlaceholderTransformation):

def placeholder_replacements(self, p: Placeholder) -> List[str]:
try:
values = self.pipeline.vars[p.name]
values = self._pipeline.vars[p.name]
except KeyError:
raise SigmaValueError(f"Placeholder replacement variable '{ p.name }' doesn't exists.")

Expand Down Expand Up @@ -984,6 +984,51 @@ def apply(
rule.custom_attributes[self.attribute] = self.value


@dataclass
class NestedProcessingTransformation(Transformation):
"""Executes a nested processing pipeline as transformation. Main purpose is to apply a
whole set of transformations that match the given conditions of the enclosng processing item.
"""

items: List["sigma.processing.pipeline.ProcessingItem"]
_nested_pipeline: "sigma.processing.pipeline.ProcessingPipeline" = field(
init=False, compare=False, repr=False
)

def __post_init__(self):
from sigma.processing.pipeline import (
ProcessingPipeline,
) # TODO: move to top-level after restructuring code

self._nested_pipeline = ProcessingPipeline(items=self.items)

@classmethod
def from_dict(cls, d: Dict) -> "NestedProcessingTransformation":
from sigma.processing.pipeline import (
ProcessingItem,
) # TODO: move to top-level after restructuring code

try:
return cls(items=[ProcessingItem.from_dict(item) for item in d["items"]])
except KeyError:
raise SigmaConfigurationError(
"Nested processing transformation requires an 'items' key."
)

def apply(
self,
pipeline: "sigma.processing.pipeline.ProcessingPipeline",
rule: Union[SigmaRule, SigmaCorrelationRule],
) -> None:
super().apply(pipeline, rule)
self._nested_pipeline.apply(rule)
pipeline.applied.extend(self._nested_pipeline.applied)
pipeline.applied_ids.update(self._nested_pipeline.applied_ids)
pipeline.field_name_applied_ids.update(self._nested_pipeline.field_name_applied_ids)
pipeline.field_mappings.merge(self._nested_pipeline.field_mappings)
pipeline.state.update(self._nested_pipeline.state)


transformations: Dict[str, Transformation] = {
"field_name_mapping": FieldMappingTransformation,
"field_name_prefix_mapping": FieldPrefixMappingTransformation,
Expand All @@ -1008,4 +1053,5 @@ def apply(
"rule_failure": RuleFailureTransformation,
"detection_item_failure": DetectionItemFailureTransformation,
"set_custom_attribute": SetCustomAttributeTransformation,
"nest": NestedProcessingTransformation,
}
Loading

0 comments on commit d52d8b5

Please sign in to comment.