diff --git a/CHANGELOG.md b/CHANGELOG.md index 26eeba3b1..5dc2bcc97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ * expose metrics via uvicorn webserver * makes all uvicorn configuration options possible * add security best practices to server configuration +* add following metrics to `http_input` connector + * `nummer_of_http_requests` + * `message_backlog_size` ### Improvements diff --git a/logprep/connector/http/input.py b/logprep/connector/http/input.py index a08a722c3..af399c155 100644 --- a/logprep/connector/http/input.py +++ b/logprep/connector/http/input.py @@ -96,11 +96,12 @@ ) from logprep.abc.input import FatalInputError, Input +from logprep.metrics.metrics import CounterMetric, GaugeMetric from logprep.util import http from logprep.util.credentials import CredentialsFactory -def decorator_basic_auth(func: Callable): +def basic_auth(func: Callable): """Decorator to check basic authentication. Will raise 401 on wrong credentials or missing Authorization-Header""" @@ -120,7 +121,7 @@ async def func_wrapper(*args, **kwargs): return func_wrapper -def decorator_request_exceptions(func: Callable): +def handle_request_exceptions(func: Callable): """Decorator to wrap http calls and raise exceptions""" async def func_wrapper(*args, **kwargs): @@ -143,7 +144,7 @@ async def func_wrapper(*args, **kwargs): return func_wrapper -def decorator_add_metadata(func: Callable): +def add_metadata(func: Callable): """Decorator to add metadata to resulting http event. Uses attribute collect_meta of endpoint class to decide over metadata collection Uses attribute metafield_name to define key name for metadata @@ -199,27 +200,34 @@ def __init__( collect_meta: bool, metafield_name: str, credentials: dict, + metrics: "HttpConnector.Metrics", ) -> None: self.messages = messages self.collect_meta = collect_meta self.metafield_name = metafield_name self.credentials = credentials + self.metrics = metrics if self.credentials: self.basicauth_b64 = b64encode( f"{self.credentials.username}:{self.credentials.password}".encode("utf-8") ).decode("utf-8") + def collect_metrics(self): + """Increment number of requests""" + self.metrics.number_of_http_requests += 1 + class JSONHttpEndpoint(HttpEndpoint): """:code:`json` endpoint to get json from request""" _decoder = msgspec.json.Decoder() - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """json endpoint method""" + self.collect_metrics() data = await req.stream.read() data = data.decode("utf8") metadata = kwargs.get("metadata", {}) @@ -233,11 +241,12 @@ class JSONLHttpEndpoint(HttpEndpoint): _decoder = msgspec.json.Decoder() - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """jsonl endpoint method""" + self.collect_metrics() data = await req.stream.read() data = data.decode("utf8") event = kwargs.get("metadata", {}) @@ -252,11 +261,12 @@ class PlaintextHttpEndpoint(HttpEndpoint): """:code:`plaintext` endpoint to get the body from request and put it in :code:`message` field""" - @decorator_request_exceptions - @decorator_basic_auth - @decorator_add_metadata + @handle_request_exceptions + @basic_auth + @add_metadata async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-differ """plaintext endpoint method""" + self.collect_metrics() data = await req.stream.read() metadata = kwargs.get("metadata", {}) event = {"message": data.decode("utf8")} @@ -266,6 +276,26 @@ async def __call__(self, req, resp, **kwargs): # pylint: disable=arguments-diff class HttpConnector(Input): """Connector to accept log messages as http post requests""" + @define(kw_only=True) + class Metrics(Input.Metrics): + """Tracks statistics about this connector""" + + number_of_http_requests: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of incomming requests", + name="number_of_http_requests", + ) + ) + """Number of incomming requests""" + + message_backlog_size: GaugeMetric = field( + factory=lambda: GaugeMetric( + description="Size of the message backlog queue", + name="message_backlog_size", + ) + ) + """Size of the message backlog queue""" + @define(kw_only=True) class Config(Input.Config): """Config for HTTPInput""" @@ -398,7 +428,11 @@ def setup(self): endpoint_class = self._endpoint_registry.get(endpoint_type) credentials = cred_factory.from_endpoint(endpoint_path) endpoints_config[endpoint_path] = endpoint_class( - self.messages, collect_meta, metafield_name, credentials + self.messages, + collect_meta, + metafield_name, + credentials, + self.metrics, ) app = self._get_asgi_app(endpoints_config) @@ -417,6 +451,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App: def _get_event(self, timeout: float) -> Tuple: """Returns the first message from the queue""" + self.metrics.message_backlog_size += self.messages.qsize() try: message = self.messages.get(timeout=timeout) raw_message = str(message).encode("utf8") diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py index a62fcc741..8dd32dd45 100644 --- a/logprep/connector/s3/output.py +++ b/logprep/connector/s3/output.py @@ -58,8 +58,8 @@ EndpointConnectionError, ) -from logprep.abc.output import Output, FatalOutputError -from logprep.metrics.metrics import Metric, CounterMetric +from logprep.abc.output import FatalOutputError, Output +from logprep.metrics.metrics import CounterMetric, Metric from logprep.util.helper import get_dotted_field_value from logprep.util.time import TimeParser @@ -264,7 +264,6 @@ def store(self, document: dict): Document to store. """ self.metrics.number_of_processed_events += 1 - prefix_value = get_dotted_field_value(document, self._config.prefix_field) if prefix_value is None: document = self._build_no_prefix_document( diff --git a/tests/unit/connector/test_http_input.py b/tests/unit/connector/test_http_input.py index 1a21c2c44..4a9b39cd2 100644 --- a/tests/unit/connector/test_http_input.py +++ b/tests/unit/connector/test_http_input.py @@ -2,11 +2,11 @@ # pylint: disable=protected-access # pylint: disable=attribute-defined-outside-init import multiprocessing +import random import re from copy import deepcopy from unittest import mock -import falcon import pytest import requests import uvicorn @@ -73,6 +73,12 @@ def setup_method(self): }, } + expected_metrics = [ + *BaseInputTestCase.expected_metrics, + "logprep_message_backlog_size", + "logprep_number_of_http_requests", + ] + def teardown_method(self): while not self.object.messages.empty(): self.object.messages.get(timeout=0.001) @@ -368,3 +374,21 @@ def test_sets_target_to_http_schema_if_no_ssl_options(self): connector_config = deepcopy(self.CONFIG) connector = Factory.create({"test connector": connector_config}, logger=self.logger) assert connector.target.startswith("http://") + + def test_get_event_sets_message_backlog_size_metric(self): + self.object.metrics.message_backlog_size = 0 + random_number = random.randint(1, 100) + for number in range(random_number): + self.object.messages.put({"message": f"my message{number}"}) + self.object.get_next(0.001) + assert self.object.metrics.message_backlog_size == random_number + + def test_enpoints_count_requests(self): + self.object.metrics.number_of_http_requests = 0 + self.object.setup() + random_number = random.randint(1, 100) + for number in range(random_number): + requests.post( + url=f"{self.target}/json", json={"message": f"my message{number}"}, timeout=0.5 + ) + assert self.object.metrics.number_of_http_requests == random_number