Skip to content

Commit

Permalink
add option to ignore missing fields in field_manager (#457)
Browse files Browse the repository at this point in the history
* Fix Changelog

---------

Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Oct 13, 2023
1 parent eb4db4b commit c67124c
Show file tree
Hide file tree
Showing 29 changed files with 290 additions and 22 deletions.
16 changes: 4 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,14 @@

* add a preprocessor to enrich by systems env variables
* add option to define rules inline in pipeline config under processor configs `generic_rules` or `specific_rules`

### Improvements

* `pre_detector` processor now adds the field `creation_timestamp` to pre-detections.
It contains the time at which a pre-detection was created by the processor.
* add `prometheus` and `grafana` to the quickstart setup to support development
* reimplemented kafka input connector
- move kafka config options to `kafka_config` dictionary

### Features

* add option to `field_manager` to ignore missing source fields to suppress warnings and failure tags
* add ignore_missing_source_fields behavior to `calculator`, `concatenator`, `dissector`, `grokker`, `ip_informer`, `selective_extractor`
* kafka input connector
- implemented manual commit behaviour if `enable.auto.commit: false`
- implemented on_commit callback to check for errors during commit
- implemented statistics callback to collect metrics from underlying librdkafka library
- implemented per partition offset metrics
- get logs and handle errors from underlying librdkafka library

* kafka output connector
- implemented statistics callback to collect metrics from underlying librdkafka library
- get logs and handle errors from underlying librdkafka library
Expand All @@ -37,8 +27,10 @@ It contains the time at which a pre-detection was created by the processor.

* `pre_detector` processor now adds the field `creation_timestamp` to pre-detections.
It contains the time at which a pre-detection was created by the processor.
* add `prometheus` and `grafana` to the quickstart setup to support development
* provide confluent kafka test setup to run tests against a real kafka cluster


### Bugfix

* fix CVE-2023-37920 Removal of e-Tugra root certificate
Expand Down
2 changes: 2 additions & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ def _has_missing_values(self, event, rule, source_field_dict):
dict(filter(lambda x: x[1] in [None, ""], source_field_dict.items())).keys()
)
if missing_fields:
if rule.ignore_missing_fields:
return True
error = BaseException(f"{self.name}: no value for fields: {missing_fields}")
self._handle_warning_error(event, rule, error)
return True
Expand Down
3 changes: 3 additions & 0 deletions logprep/processor/amides/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ class Config(FieldManagerRule.Config):
)
target_field: str = field(validator=validators.instance_of(str), default="amides")
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(
init=False, repr=False, eq=False, default=False, validator=validators.instance_of(bool)
)
4 changes: 4 additions & 0 deletions logprep/processor/calculator/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class Config(FieldManagerRule.Config):
"""
timeout: int = field(validator=validators.instance_of(int), converter=int, default=1)
"""The maximum time in seconds for the calculation. Defaults to :code:`1`"""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=False)
"""If set to :code:`True` missing fields will be ignored, no warning is logged,
and the event is not tagged with the a failure tag. As soon as one field is missing
no calculation is performed at all. Defaults to :code:`False`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)

def __attrs_post_init__(self):
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/concatenator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
.. automodule:: logprep.processor.concatenator.rule
"""
from logprep.abc.processor import Processor
from logprep.processor.concatenator.rule import ConcatenatorRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.helper import get_dotted_field_value


Expand All @@ -38,7 +38,7 @@ def __init__(self, name: str, message: str):
super().__init__(f"Concatenator ({name}): {message}")


class Concatenator(Processor):
class Concatenator(FieldManager):
"""Concatenates a list of source fields into a new target field."""

rule_class = ConcatenatorRule
Expand All @@ -56,11 +56,11 @@ def _apply_rules(self, event, rule: ConcatenatorRule):
rule :
Currently applied concatenator rule.
"""

source_field_values = []
for source_field in rule.source_fields:
field_value = get_dotted_field_value(event, source_field)
source_field_values.append(field_value)
self._handle_missing_fields(event, rule, rule.source_fields, source_field_values)

source_field_values = [field for field in source_field_values if field is not None]
target_value = f"{rule.separator}".join(source_field_values)
Expand Down
2 changes: 2 additions & 0 deletions logprep/processor/datetime_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ class Config(FieldManagerRule.Config):
source_fields: list = field(
validator=[
validators.instance_of(list),
validators.max_len(1),
validators.deep_iterable(member_validator=validators.instance_of(str)),
],
)
"""The fields from where to get the values which should be processed."""
target_field: str = field(validator=validators.instance_of(str))
"""The field where to write the processed values to. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
8 changes: 5 additions & 3 deletions logprep/processor/dissector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
"""
from typing import Callable, List, Tuple

from logprep.abc.processor import Processor
from logprep.processor.dissector.rule import DissectorRule
from logprep.util.helper import get_dotted_field_value, add_field_to
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.helper import add_field_to, get_dotted_field_value


class Dissector(Processor):
class Dissector(FieldManager):
"""A processor that tokenizes field values to new fields and converts datatypes"""

rule_class = DissectorRule
Expand Down Expand Up @@ -67,6 +67,8 @@ def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, str, str, str
current_field = source_field
loop_content = get_dotted_field_value(event, current_field)
if loop_content is None:
if rule.ignore_missing_fields:
continue
error = BaseException(
f"dissector: mapping field '{source_field}' does not exist"
)
Expand Down
3 changes: 2 additions & 1 deletion logprep/processor/domain_label_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
:inherited-members:
:noindex:
"""
from attr import field, validators, define
from attr import define, field, validators

from logprep.processor.field_manager.rule import FieldManagerRule

Expand All @@ -75,3 +75,4 @@ class Config(FieldManagerRule.Config):
)
"""The fields from where to get the values which should be processed."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
1 change: 1 addition & 0 deletions logprep/processor/domain_resolver/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ class Config(FieldManagerRule.Config):
)
"""The field where to write the processor output to. Defaults to :code:`resovled_ip`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
7 changes: 6 additions & 1 deletion logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ def _overwrite_with_list_from_source_field_values(self, *args):
add_and_overwrite(event, target_field, target_field_value)

def _handle_missing_fields(self, event, rule, source_fields, field_values):
if rule.ignore_missing_fields:
return False
if None in field_values:
error = self._get_missing_fields_error(source_fields, field_values)
self._handle_warning_error(
event, rule, error, failure_tags=["_field_manager_missing_field_warning"]
event,
rule,
error,
failure_tags=[f"_{self.rule_class.rule_type}_missing_field_warning"],
)
return True
return False
Expand Down
7 changes: 7 additions & 0 deletions logprep/processor/field_manager/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class Config(Rule.Config):
If the target field does not exist, a new field will be added with the
source field value as list. Defaults to :code:`False`.
"""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=False)
"""If set to :code:`True` missing fields will be ignored, no warning is logged and the event
is not tagged with the failure tag. Defaults to :code:`False`"""

def __attrs_post_init__(self):
# ensures no split operations during processing
Expand Down Expand Up @@ -161,4 +164,8 @@ def overwrite_target(self):
def extend_target_list(self):
return self._config.extend_target_list

@property
def ignore_missing_fields(self):
return self._config.ignore_missing_fields

# pylint: enable=missing-function-docstring
1 change: 1 addition & 0 deletions logprep/processor/geoip_enricher/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Config(FieldManagerRule.Config):
description: '...'
"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

@property
def customize_target_subfields(self) -> dict: # pylint: disable=missing-function-docstring
Expand Down
8 changes: 7 additions & 1 deletion logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
from attrs import define, field, validators

from logprep.abc.processor import Processor
from logprep.processor.base.exceptions import FieldExistsWarning, ProcessingWarning, ProcessingError
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingError,
ProcessingWarning,
)
from logprep.processor.grokker.rule import GrokkerRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value
Expand Down Expand Up @@ -66,6 +70,8 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
for dotted_field, grok in rule.actions.items():
field_value = get_dotted_field_value(event, dotted_field)
if field_value is None:
if rule.ignore_missing_fields:
continue
error = BaseException(f"{self.name}: missing source_field: '{dotted_field}'")
self._handle_warning_error(event=event, rule=rule, error=error)
continue
Expand Down
2 changes: 2 additions & 0 deletions logprep/processor/ip_informer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class IpInformer(FieldManager):
rule_class = IpInformerRule

def _apply_rules(self, event: dict, rule: IpInformerRule) -> None:
source_field_values = self._get_field_values(event, rule.source_fields)
self._handle_missing_fields(event, rule, rule.source_fields, source_field_values)
self._processing_warnings = []
ip_address_list = self._get_flat_ip_address_list(event, rule)
results = self._get_results(ip_address_list, rule)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/key_checker/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ class Config(FieldManagerRule.Config):
target_field: str = field(validator=validators.instance_of(str))
"""The field where to write the processed values to. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
1 change: 1 addition & 0 deletions logprep/processor/list_comparison/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Config(FieldManagerRule.Config):
e.g., :code:`${<your environment variable>}`. The special key :code:`${LOGPREP_LIST}`
will be filled by this processor. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

def __init__(self, filter_rule: FilterExpression, config: dict):
super().__init__(filter_rule, config)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/requester/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Config(FieldManagerRule.Config):
cert: str = field(validator=validators.instance_of(str), default="")
"""(Optional) SSL client certificate as path to ssl client cert file (.pem)."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

def __attrs_post_init__(self):
url_fields = re.findall(FIELD_PATTERN, self.url)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/selective_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule):
"""
flattened_fields = get_source_fields_dict(event, rule)
self._handle_missing_fields(event, rule, flattened_fields.keys(), flattened_fields.values())
flattened_fields = {
dotted_field: content
for dotted_field, content in flattened_fields.items()
Expand Down
3 changes: 3 additions & 0 deletions logprep/processor/selective_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class Config(FieldManagerRule.Config):
extract_from_file: str = field(validator=validators.instance_of(str), default="", eq=False)
"""The path or url to a file with a flat list of fields to extract.
For string format see :ref:`getters`."""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=True)
"""If set to :code:`True` missing fields will be ignored, no warning is logged and the event
is not tagged with the failure tag. Defaults to :code:`True`"""
target_field: str = field(default="", init=False, repr=False, eq=False)
overwrite_target: bool = field(default=False, init=False, repr=False, eq=False)
extend_target_list: bool = field(default=False, init=False, repr=False, eq=False)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/string_splitter/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Config(FieldManagerRule.Config):
delimeter: str = field(validator=validators.instance_of(str), default=" ")
"""The delimeter for splitting. Defaults to whitespace"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

@property
def delimeter(self):
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/timestamp_differ/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class Config(FieldManagerRule.Config):
"""(Optional) Specifies whether the unit (s, ms, ns) should be part of the output.
Defaults to :code:`False`."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

def __attrs_post_init__(self):
field_format_str = re.findall(FIELD_PATTERN, self.diff)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/timestamper/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Config(FieldManagerRule.Config):
)
""" timezone for target_field. defaults to :code:`UTC`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

@property
def source_format(self):
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/processor/calculator/test_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@
{"duration": "0.01"},
{"duration": 10000.0},
),
(
"Ignore missing source fields",
{
"filter": "duration",
"calculator": {
"calc": "${missing_field} * 10e5",
"target_field": "duration",
"ignore_missing_fields": True,
},
},
{"duration": "0.01"},
{"duration": "0.01"},
),
]


Expand Down
21 changes: 21 additions & 0 deletions tests/unit/processor/concatenator/test_concatenator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def specific_rules_dirs(self):
"separator": "-",
"overwrite_target": False,
"delete_source_fields": False,
"ignore_missing_fields": True,
},
},
{"field": {"a": "first", "b": "second"}},
Expand Down Expand Up @@ -143,6 +144,26 @@ def specific_rules_dirs(self):
},
{"field": {"c": "another one"}, "target_field": "first-second"},
),
(
"ignore missing fields",
{
"filter": "field.a",
"concatenator": {
"source_fields": ["field.a", "field.b", "other_field.c"],
"target_field": "target_field",
"separator": "-",
"overwrite_target": False,
"delete_source_fields": False,
"ignore_missing_fields": True,
},
},
{"field": {"a": "first"}, "other_field": {"c": "third"}},
{
"field": {"a": "first"},
"other_field": {"c": "third"},
"target_field": "first-third",
},
),
],
)
def test_for_expected_output(self, test_case, rule, document, expected_output):
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/processor/dissector/test_dissector.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,24 @@
"sys_type": "system_monitor",
},
),
(
"ignore missing fields",
{
"filter": "message",
"dissector": {
"mapping": {
"message": "%{sys_type}",
"does_not_exist": "%{sys_type}",
},
"ignore_missing_fields": True,
},
},
{"message": "system_monitor"},
{
"message": "system_monitor",
"sys_type": "system_monitor",
},
),
]
failure_test_cases = [ # testcase, rule, event, expected
(
Expand Down
Loading

0 comments on commit c67124c

Please sign in to comment.