diff --git a/utilities/earthly-cache-watcher/main.py b/utilities/earthly-cache-watcher/main.py index 66485d612..e932ebf56 100644 --- a/utilities/earthly-cache-watcher/main.py +++ b/utilities/earthly-cache-watcher/main.py @@ -5,7 +5,7 @@ import threading import time from collections.abc import Callable -from typing import Set +from typing import set from dotenv import dotenv_values from loguru import logger @@ -17,6 +17,7 @@ logger.remove() # Remove the default handler logger.add(sys.stdout, level="INFO", serialize=True, format="{message}") + class Interval: """ A class that repeatedly executes a function @@ -64,8 +65,8 @@ def __init__(self, interval: int): self.layer_growth_index: dict[str, int] = {} self.layer_index: dict[str, int] = {} self.file_index: dict[str, int] = {} - self.triggered_layers: Set[str] = set() - self.triggered_growth_layers: Set[str] = set() + self.triggered_layers: set[str] = set() + self.triggered_growth_layers: set[str] = set() self.interval = Interval(interval, self.handle_interval_change) self.list_initial_sizes() @@ -92,9 +93,7 @@ def list_initial_sizes(self): self.file_index[file_path] = size helper.add_or_init(self.layer_index, layer_name, size) - logger.debug( - f"initial file: {file_path} (size: {size:,} bytes)" - ) + logger.debug(f"initial file: {file_path} (size: {size:,} bytes)") except OSError as e: if log_file_accessing_err: logger.error(f"{e}: {file_path}") @@ -170,11 +169,15 @@ def handle_modified(self, file_path: str): # checks self.check_sizes(layer_name) - logger.debug(" ".join([ - f"file modified: {file_path}", - f"(size changed from {prev_size:,} bytes", - f"to {size:,} bytes)" - ])) + logger.debug( + " ".join( + [ + f"file modified: {file_path}", + f"(size changed from {prev_size:,} bytes", + f"to {size:,} bytes)", + ] + ) + ) else: logger.debug(f"file modified: {file_path} (size unchanged)") except OSError as e: @@ -216,22 +219,17 @@ def handle_deleted(self, file_path: str): def check_sizes(self, layer_name: str, skip_sum_check=False): if ( layer_name in self.layer_index - and self.layer_index[layer_name] - >= large_layer_size + and self.layer_index[layer_name] >= large_layer_size ): self.trigger_layer_size_exceeded(layer_name) if ( not skip_sum_check - and sum(self.layer_growth_index.values()) - >= max_time_window_growth_size + and sum(self.layer_growth_index.values()) >= max_time_window_growth_size ): self.trigger_interval_growth_exceeded() - if ( - not skip_sum_check - and sum(self.layer_index.values()) >= max_cache_size - ): + if not skip_sum_check and sum(self.layer_index.values()) >= max_cache_size: self.trigger_max_cache_size() def trigger_layer_size_exceeded(self, layer_name: str): @@ -241,17 +239,19 @@ def trigger_layer_size_exceeded(self, layer_name: str): self.triggered_layers.add(layer_name) logger.error( - " ".join([ - f"layer '{layer_name}' exceeds large layer size criteria", - f"(size: {self.layer_index[layer_name]:,} bytes", - f"- limit: {large_layer_size:,} bytes)" - ]), + " ".join( + [ + f"layer '{layer_name}' exceeds large layer size criteria", + f"(size: {self.layer_index[layer_name]:,} bytes", + f"- limit: {large_layer_size:,} bytes)", + ] + ), extra={ "err_type": "layer_size_exceeded", "layer": layer_name, "size": self.layer_index[layer_name], - "limit": large_layer_size - } + "limit": large_layer_size, + }, ) def trigger_interval_growth_exceeded(self): @@ -265,33 +265,37 @@ def trigger_interval_growth_exceeded(self): self.triggered_growth_layers.add(layer_name) logger.error( - " ".join([ - f"layer '{layer_name}'", - f"- {size:,} bytes within the interval" - ]), + " ".join( + [ + f"layer '{layer_name}'", + f"- {size:,} bytes within the interval", + ] + ), extra={ "err_type": "layer_list_growth_exceeded", "layer": layer_name, - "size": size - } + "size": size, + }, ) if has_triggered_layer: size = sum(self.layer_growth_index.values()) logger.error( - " ".join([ - "the total amount of cache growth", - f"within {time_window:,} secs exceeds the limit", - f"(size: {size:,} bytes", - f"- limit: {max_time_window_growth_size:,} bytes)" - ]), + " ".join( + [ + "the total amount of cache growth", + f"within {time_window:,} secs exceeds the limit", + f"(size: {size:,} bytes", + f"- limit: {max_time_window_growth_size:,} bytes)", + ] + ), extra={ "err_type": "interval_growth_exceeded", "size": size, "limit": max_time_window_growth_size, - "within": time_window - } + "within": time_window, + }, ) except RuntimeError as e: logger.error(f"an error occurred: {e}") @@ -300,29 +304,26 @@ def trigger_max_cache_size(self): size = sum(self.layer_index.values()) logger.error( - " ".join([ - "the total amount of cache exceeds the limit", - f"(size: {size:,} bytes", - f"- limit: {max_cache_size:,} bytes)" - ]), + " ".join( + [ + "the total amount of cache exceeds the limit", + f"(size: {size:,} bytes", + f"- limit: {max_cache_size:,} bytes)", + ] + ), extra={ "err_type": "max_cache_size_exceeded", "size": size, - "limit": max_cache_size - } + "limit": max_cache_size, + }, ) def drop(self): self.interval.drop() + def main(): - global \ - watch_dir, \ - large_layer_size, \ - max_cache_size, \ - time_window, \ - max_time_window_growth_size, \ - log_file_accessing_err + global watch_dir, large_layer_size, max_cache_size, time_window, max_time_window_growth_size, log_file_accessing_err default_config_path = sys.argv[1] if len(sys.argv) > 1 else "default.conf" @@ -335,9 +336,7 @@ def main(): log_file_accessing_err = True if os.path.isfile(default_config_path): - logger.info( - f"read config from {os.path.abspath(default_config_path)!r}" - ) + logger.info(f"read config from {os.path.abspath(default_config_path)!r}") cfg = dotenv_values(default_config_path) @@ -354,14 +353,17 @@ def main(): logger.info(f"with `large_layer_size` set to {large_layer_size:,} bytes") logger.info(f"with `max_cache_size` set to {max_cache_size:,} bytes") logger.info(f"with `time_window` set to {time_window:,} secs") - logger.info(" ".join([ - "with `max_time_window_growth_size` set to", - f"{max_time_window_growth_size:,} bytes" - ])) - logger.info(" ".join([ - "with `log_file_accessing_err` set to", - log_file_accessing_err - ])) + logger.info( + " ".join( + [ + "with `max_time_window_growth_size` set to", + f"{max_time_window_growth_size:,} bytes", + ] + ) + ) + logger.info( + " ".join(["with `log_file_accessing_err` set to", log_file_accessing_err]) + ) # init watcher handler = ChangeEventHandler(time_window)