Skip to content

Commit

Permalink
fix broken gracefull shutdown (#568)
Browse files Browse the repository at this point in the history
* fix gracefull timout

had to increase the `timeout_graceful_shutdown` internal config parameter.
without this the server drops an error message on every shutdown.

hat to check for attribute `http_server` before shutdown, because the shutdown
proceeds in every process, but only in the first process this attribute is defined

* only start multiprocess logging in main process

had to ensure to only start multiprocess logging in main
process by checking for the current process name. This is needed
because of forking for pipeline creation is complaining for a deamonic process
having a child (the log process).

* register signals in cli function

the cli() function is the main entrypoint for logprep by installing
a script called "logprep" via setuptools. so nothing in the root of the module
run_logprep is executed
  • Loading branch information
ekneg54 authored Apr 22, 2024
1 parent dff0776 commit f66b548
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 27 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

### Bugfix

* fix a bug in http connector leading to only first process working
* fixes a bug in http connector leading to only first process working
* fixes the broken gracefull shutdown behaviour

## 11.0.1
### Bugfix
Expand Down
7 changes: 5 additions & 2 deletions logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def __init__(self, name: str, configuration: "HttpConnector.Config", logger: Log
internal_uvicorn_config = {
"lifespan": "off",
"loop": "asyncio",
"timeout_graceful_shutdown": 0,
"timeout_graceful_shutdown": 5,
}
self._config.uvicorn_config.update(internal_uvicorn_config)
self.port = self._config.uvicorn_config["port"]
Expand All @@ -388,10 +388,11 @@ def setup(self):
raise FatalInputError(
self, "Necessary instance attribute `pipeline_index` could not be found."
)

self._logger.debug(
f"HttpInput Connector started on target {self.target} and "
f"queue {id(self.messages)} "
f"with queue_size: {self.messages._maxsize}"
f"with queue_size: {self.messages._maxsize}" # pylint: disable=protected-access
)
# Start HTTP Input only when in first process
if self.pipeline_index != 1:
Expand Down Expand Up @@ -432,4 +433,6 @@ def get_server_instance(self):

def shut_down(self):
"""Raises Uvicorn HTTP Server internal stop flag and waits to join"""
if not hasattr(self, "http_server"):
return
self.http_server.shut_down()
46 changes: 27 additions & 19 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

def logger_process(queue: multiprocessing.queues.Queue, logger: logging.Logger):
"""Process log messages from a queue."""
try:
while True:
message = queue.get()
if message is None:
break
logger.handle(message)
except KeyboardInterrupt:
pass

while True:
message = queue.get()
if message is None:
break
logger.handle(message)


class PipelineManager:
Expand Down Expand Up @@ -63,11 +61,9 @@ class Metrics(Component.Metrics):
def __init__(self, configuration: Configuration):
self.metrics = self.Metrics(labels={"component": "manager"})
self._logger = logging.getLogger("Logprep PipelineManager")
self.log_queue = multiprocessing.Queue(-1)
self._log_process = multiprocessing.Process(
target=logger_process, args=(self.log_queue, self._logger), daemon=True
)
self._log_process.start()
if multiprocessing.current_process().name == "MainProcess":
self._start_multiprocess_logger()
self._set_http_input_queue(configuration)
self._pipelines: list[multiprocessing.Process] = []
self._configuration = configuration

Expand All @@ -77,13 +73,25 @@ def __init__(self, configuration: Configuration):
self.prometheus_exporter = PrometheusExporter(prometheus_config)
else:
self.prometheus_exporter = None

def _set_http_input_queue(self, configuration):
"""
this workaround has to be done because the queue size is not configurable
after initialization and the queue has to be shared between the multiple processes
"""
input_config = next(iter(configuration.input.values()))
if input_config.get("type") == "http_input":
# this workaround has to be done because the queue size is not configurable
# after initialization and the queue has to be shared between the multiple processes
if HttpConnector.messages is None:
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)
is_http_input = input_config.get("type") == "http_input"
if not is_http_input and HttpConnector.messages is not None:
return
message_backlog_size = input_config.get("message_backlog_size", 15000)
HttpConnector.messages = multiprocessing.Queue(maxsize=message_backlog_size)

def _start_multiprocess_logger(self):
self.log_queue = multiprocessing.Queue(-1)
self._log_process = multiprocessing.Process(
target=logger_process, args=(self.log_queue, self._logger), daemon=True
)
self._log_process.start()

def get_count(self) -> int:
"""Get the pipeline count.
Expand Down
9 changes: 4 additions & 5 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@
import os
import signal
import sys
import tempfile
import warnings
from pathlib import Path

import click
from colorama import Fore

from logprep.event_generator.http.controller import Controller
from logprep.event_generator.kafka.run_load_tester import LoadTester
from logprep.runner import Runner
from logprep.util import defaults
from logprep.util.auto_rule_tester.auto_rule_corpus_tester import RuleCorpusTester
from logprep.util.auto_rule_tester.auto_rule_tester import AutoRuleTester
from logprep.util.configuration import Configuration, InvalidConfigurationError
from logprep.util.helper import get_versions_string, print_fcolor
from logprep.util.rule_dry_runner import DryRunner
from logprep.util import defaults

warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)
Expand Down Expand Up @@ -60,6 +58,9 @@ def cli() -> None:
Logprep allows to collect, process and forward log messages from various data sources.
Log messages are being read and written by so-called connectors.
"""
if "pytest" not in sys.modules: # needed for not blocking tests
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)


@cli.command(short_help="Run logprep to process log messages", epilog=EPILOG_STR)
Expand Down Expand Up @@ -323,6 +324,4 @@ def signal_handler(__: int, _) -> None:


if __name__ == "__main__":
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
cli()

0 comments on commit f66b548

Please sign in to comment.