From b4534fd7d5154122f8d433640ab4c9c1c6287f3d Mon Sep 17 00:00:00 2001 From: Etienne Champetier Date: Thu, 25 Apr 2024 19:51:32 -0400 Subject: [PATCH] [plugins] speedup journal collection Instead of generating all the logs and tailing the last 100M, we get the first 100M of 'journalctl --reverse' that we then reverse again using our own implementation of tac. To handle multiline logs we would need to use "tac -brs '^[^ ]'" that takes ~30s on 100M of logs when plain 'tac' takes ~0.3s. Our simple implementation in python takes 0.7s, and avoid an extra dependency. On journalctl timeout we now get the most recents logs. During collection logs are now buffered on disk, so we use 2xsizelimit. While running our tac we could actually truncate the source file to limit disk usage. Previously buffering was in RAM (also 2xsizelimit). On my test server, logs plugin runtime goes from 34s to 9.5s. Signed-off-by: Etienne Champetier --- sos/report/plugins/__init__.py | 19 ++++++++---- sos/utilities.py | 53 ++++++++++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/sos/report/plugins/__init__.py b/sos/report/plugins/__init__.py index dd467d3e32..56d2e1f9c0 100644 --- a/sos/report/plugins/__init__.py +++ b/sos/report/plugins/__init__.py @@ -2372,7 +2372,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, binary=False, sizelimit=None, subdir=None, changes=False, foreground=False, tags=[], priority=10, cmd_as_tag=False, to_file=False, - container_cmd=False, runas=None): + tac=False, container_cmd=False, runas=None): """Execute a command and save the output to a file for inclusion in the report. @@ -2400,6 +2400,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, :param cmd_as_tag: Format command string to tag :param to_file: Write output directly to file instead of saving in memory + :param tac: Reverse lines order (need to_file=True) :param runas: Run the `cmd` as the `runas` user :returns: dict containing status, output, and filename in the @@ -2451,7 +2452,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, cmd, timeout=timeout, stderr=stderr, chroot=root, chdir=runat, env=_env, binary=binary, sizelimit=sizelimit, poller=self.check_timeout, foreground=foreground, - to_file=out_file, runas=runas + to_file=out_file, tac=tac, runas=runas ) end = time() @@ -2489,7 +2490,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, result = sos_get_command_output( cmd, timeout=timeout, chroot=False, chdir=runat, env=env, binary=binary, sizelimit=sizelimit, - poller=self.check_timeout, to_file=out_file + poller=self.check_timeout, to_file=out_file, tac=tac, ) run_time = time() - start self._log_debug(f"could not run '{cmd}': command not found") @@ -3083,10 +3084,18 @@ def add_journal(self, units=None, boot=None, since=None, until=None, if output: journal_cmd += output_opt % output + fname = journal_cmd + tac = False + if log_size > 0 and is_executable("head"): + journal_cmd = f"sh -c '{journal_cmd} --reverse | " \ + "head -c {log_size*1024*1024}'" + log_size = 0 + tac = True + self._log_debug(f"collecting journal: {journal_cmd}") - self._add_cmd_output(cmd=journal_cmd, timeout=timeout, + self._add_cmd_output(cmd=journal_cmd, timeout=timeout, tac=tac, sizelimit=log_size, pred=pred, tags=tags, - priority=priority) + priority=priority, suggest_filename=fname) def _expand_copy_spec(self, copyspec): def __expand(paths): diff --git a/sos/utilities.py b/sos/utilities.py index e666c64972..25157460eb 100644 --- a/sos/utilities.py +++ b/sos/utilities.py @@ -20,6 +20,7 @@ import threading import time import io +import mmap from contextlib import closing from collections import deque @@ -213,7 +214,7 @@ def is_executable(command, sysroot=None): def sos_get_command_output(command, timeout=TIMEOUT_DEFAULT, stderr=False, chroot=None, chdir=None, env=None, foreground=False, binary=False, sizelimit=None, poller=None, - to_file=False, runas=None): + to_file=False, tac=False, runas=None): # pylint: disable=too-many-locals,too-many-branches """Execute a command and return a dictionary of status and output, optionally changing root or current working directory before @@ -275,8 +276,11 @@ def _check_poller(proc): else: expanded_args.append(arg) if to_file: - # pylint: disable=consider-using-with - _output = open(to_file, 'w', encoding='utf-8') + if tac: + _output = tempfile.TemporaryFile(dir=os.path.dirname(to_file)) + else: + # pylint: disable=consider-using-with + _output = open(to_file, 'w', encoding='utf-8') else: _output = PIPE try: @@ -285,10 +289,10 @@ def _check_poller(proc): bufsize=-1, env=cmd_env, close_fds=True, preexec_fn=_child_prep_fn) as p: - if not to_file: - reader = AsyncReader(p.stdout, sizelimit, binary) - else: + if to_file: reader = FakeReader(p, binary) + else: + reader = AsyncReader(p.stdout, sizelimit, binary) if poller: while reader.running: @@ -301,6 +305,9 @@ def _check_poller(proc): except Exception: p.terminate() if to_file: + if tac: + with open(to_file, 'wb') as f_dst: + tac_logs(_output, f_dst) _output.close() # until we separate timeouts from the `timeout` command # handle per-cmd timeouts via Plugin status checks @@ -308,6 +315,9 @@ def _check_poller(proc): return {'status': 124, 'output': reader.get_contents(), 'truncated': reader.is_full} if to_file: + if tac: + with open(to_file, 'wb') as f_dst: + tac_logs(_output, f_dst) _output.close() # wait for Popen to set the returncode @@ -332,6 +342,37 @@ def _check_poller(proc): raise e +def tac_logs(f_src, f_dst): + """Python implementation of the tac utility with support + for multiline logs (starting with space). It is intended + to reverse the output of 'journalctl --reverse'. + """ + NEWLINE = b'\n' + SPACE = 32 + with mmap.mmap(f_src.fileno(), 0, access=mmap.ACCESS_READ) as mm: + # find the last NEWLINE, this skips the last line if it's partial + sep1 = sep2 = mm.rfind(NEWLINE) + while sep2 >= 0: + sep1 = mm.rfind(NEWLINE, 0, sep1) + # multiline logs have a first line not starting with space + # followed by lines starting with spaces + # line 5 + # line 4 + # multiline 4 + # line 3 + if mm[sep1+1] == SPACE: + # first line starts with a space + # (this should not happen) + if sep1 == -1: + break + # go find the previous NEWLINE + continue + # write the log line ending with the NEWLINE + f_dst.write(mm[sep1+1:sep2+1]) + sep2 = sep1 + mm.close() + + def import_module(module_fqname, superclasses=None): """Imports the module module_fqname and returns a list of defined classes from that module. If superclasses is defined then the classes returned will