diff --git a/snyk_sync/cli.py b/snyk_sync/cli.py index caa0a43..13c4d1f 100644 --- a/snyk_sync/cli.py +++ b/snyk_sync/cli.py @@ -5,6 +5,7 @@ from datetime import timedelta from os import environ from pathlib import Path +from pprint import pformat from pprint import pprint from typing import Any from typing import Dict @@ -33,6 +34,8 @@ from utils import jopen from utils import jwrite from utils import load_watchlist +from utils import logger +from utils import set_log_level from utils import update_client from utils import yopen @@ -162,12 +165,27 @@ def main( envvar="GITHUB_TOKEN", callback=settings_callback, ), + log_level: str = typer.Option( + default="ERROR", + help="The log level to set for the Snyk-Sync application.", + envvar="SNYK_SYNC_LOG_LEVEL", + callback=settings_callback, + ), + set_root_log_level: bool = typer.Option( + default=False, + help="Sets the log level defined in the `log-level` attribute across all modules / packages", + envvar="SNYK_SYNC_SET_ROOT_LOG_LEVEL", + callback=settings_callback, + ), ): # We keep this as the global settings hash global s global watchlist + # Before we do anything, let's set up the log levels + set_log_level(log_level, set_root_log_level) + s = Settings.parse_obj(ctx.params) if ctx.invoked_subcommand is None: @@ -191,6 +209,7 @@ def sync( global watchlist typer.echo("Sync starting", err=True) + logger.debug("starting sync") load_conf() @@ -198,15 +217,12 @@ def sync( # or return an empty one if there is none tmp_watch: SnykWatchList = load_watchlist(s.cache_dir) - watchlist.repos = tmp_watch.repos + logger.debug(f"loaded snyk watch list [{pformat(tmp_watch)}]") GH_PAGE_LIMIT = 100 - gh = Github(s.github_token, per_page=GH_PAGE_LIMIT) - client = SnykClient(str(s.snyk_token), user_agent=f"pysnyk/snyk_services/sync/{__version__}", tries=2, delay=1) - v3client = SnykClient( str(s.snyk_token), version="2022-04-06~beta", @@ -218,8 +234,10 @@ def sync( if s.github_orgs is not None: gh_orgs = list(s.github_orgs) + logger.debug(f"github orgs loaded from settings [{pformat(gh_orgs)}]") else: gh_orgs = list() + logger.debug("github orgs not found in settings, currently empty...") exclude_list: list = [] @@ -228,32 +246,40 @@ def sync( repo_ids: list = [] for gh_org_name in gh_orgs: + logger.debug(f"processing org {gh_org_name}") gh_org = get_organization_wrapper(gh, gh_org_name, show_rate_limit) gh_repos = get_repos_wrapper( gh_org=gh_org, show_rate_limit=show_rate_limit, type="all", sort="updated", direction="desc" ) gh_repos_count = gh_repos.totalCount - pages = gh_repos_count // GH_PAGE_LIMIT + logger.debug(f"loaded org=[{pformat(gh_org)}] repos=[{pformat({gh_repos})}]") + logger.debug(f"repos_count={gh_repos_count}, calculated pages={pages}") if (gh_repos_count % GH_PAGE_LIMIT) > 0: pages += 1 + logger.debug(f"repos_count={gh_repos_count}, calculated pages={pages}") + with typer.progressbar( length=pages, label=f"Processing {gh_repos_count} repos in {gh_org_name}: " ) as gh_progress: for r_int in range(0, pages): + logger.debug(f"processing repos page {r_int}") for gh_repo in get_page_wrapper(gh_repos, r_int, show_rate_limit): + logger.debug(f"processing repo {pformat(gh_repo)}") watchlist.add_repo(gh_repo) repo_ids.append(gh_repo.id) gh_progress.update(1) watchlist.prune(repo_ids) + logger.debug(f"watchlist=[{pformat(watchlist)}]") import_yamls: list = [] for gh_org in gh_orgs: + logger.debug(f"processing org {pformat(gh_org)}") search = f"org:{gh_org} path:.snyk.d filename:import language:yaml" import_repos: PaginatedList[ContentFile] = gh.search_code(query=search) @@ -264,24 +290,31 @@ def sync( if (import_repos_count % GH_PAGE_LIMIT) > 0: import_repos_pages += 1 + logger.debug(f"import_repos_count={import_repos_count}, import_repos_pages={import_repos_pages}") + filtered_repos = [] for i in range(0, import_repos_pages): + logger.debug(f"processing page of repos {i}") current_page = get_page_wrapper(import_repos, i, show_rate_limit) filtered_repos.extend(filter_chunk(current_page, exclude_list)) import_yamls.extend(filtered_repos) - + logger.debug(f"filtered_repos(len)={len(filtered_repos)} import_yamls(len)={len(import_yamls)}") # we will likely want to put a limit around this, as we need to walk forked repose and try to get import.yaml # since github won't index a fork if it has less stars than upstream forks = [f for f in watchlist.repos if f.fork] forks = [y for y in forks if y.id not in exclude_list] + logger.debug(f"forks(len):{len(forks)}") + if s.forks is True and len(forks) > 0: + logger.debug(f"processing {len(forks)} forks") typer.echo(f"Scanning {len(forks)} forks for import.yaml", err=True) with typer.progressbar(forks, label="Scanning: ") as forks_progress: for fork in forks_progress: try: + logger.debug(f"processing fork [{pformat(fork)}]") f_owner = fork.source.owner f_name = fork.source.name f_repo = gh.get_repo(f"{f_owner}/{f_name}") @@ -294,15 +327,17 @@ def sync( typer.echo(f"\n\n - error processing fork: {e!r}\n") typer.echo("dumping fork object:") pprint(fork) + logger.error(f"error processing fork... message={str(e)}") typer.echo(f"Have {len(import_yamls)} Repos with an import.yaml", err=True) if len(import_yamls) > 0: + logger.debug(f"processing [{pformat(import_yamls)}] imports") typer.echo(f"Loading import.yaml for non fork-ed repos", err=True) with typer.progressbar(import_yamls, label="Scanning: ") as import_progress: for import_yaml in import_progress: - + logger.debug(f"processing import [{pformat(import_yaml)}") r_id = import_yaml.repository.id import_repo = watchlist.get_repo(r_id) @@ -316,22 +351,24 @@ def sync( f"Please check that it is valid YAML\n {e}" f"\ndumping repo object: {import_repo}\n" ) + logger.error(f"error processing import... message={str(e)}") # this calls our new Orgs object which caches and populates Snyk data locally for us all_orgs = Orgs(cache=str(s.cache_dir), groups=s.snyk_groups) - select_orgs = [str(o["orgId"]) for k, o in s.snyk_orgs.items()] + logger.error(f"all_orgs={pformat(all_orgs)} select_orgs={pformat(select_orgs)}") typer.echo(f"Updating cache of Snyk projects", err=True) all_orgs.refresh_orgs(client, v3client, origin="github-enterprise", selected_orgs=select_orgs) - all_orgs.save() typer.echo("Scanning Snyk for projects originating from GitHub Enterprise Repos", err=True) for repo in watchlist.repos: + logger.debug(f"processing watchlist repo={pformat(repo)}") found_projects = all_orgs.find_projects_by_repo(repo.full_name, repo.id) for p in found_projects: + logger.debug(f"repo found and added") repo.add_project(p) watchlist.save(cachedir=str(s.cache_dir)) @@ -352,39 +389,46 @@ def status(): if s.force_sync: typer.echo("Sync forced, ignoring cache status", err=True) + logger.debug("sync forced - returnung") return False typer.echo("Checking cache status", err=True) if os.path.exists(f"{s.cache_dir}/sync.json"): sync_data = jopen(f"{s.cache_dir}/sync.json") + logger.debug(f"path exists and contains {pformat(sync_data)}") else: + logger.debug(f"repo found and added") return False last_sync = dt.strptime(sync_data["last_sync"], "%Y-%m-%dT%H:%M:%S.%f") - + logger.debug(f"last sync was performed at {pformat(last_sync)}") in_sync = True if s.cache_timeout is None: timeout = 0 else: timeout = float(str(s.cache_timeout)) + logger.debug(f"cache timeout set to {timeout}") if last_sync < dt.utcnow() - timedelta(minutes=timeout): typer.echo("Cache is out of date and needs to be updated", err=True) + logger.debug(f"cache out of date") in_sync = False else: typer.echo(f"Cache is less than {s.cache_timeout} minutes old", err=True) + logger.debug(f"cache is current") typer.echo("Attempting to load cache", err=True) watchlist = load_watchlist(s.cache_dir) + logger.debug(f"watchlist={pformat(watchlist)}") typer.echo("Cache loaded successfully", err=True) watchlist.default_org = s.default_org watchlist.snyk_orgs = s.snyk_orgs - + logger.debug(f"watchlist.default_org={pformat(s.default_org)}, watchlist.snyk_orgs={pformat(s.snyk_orgs)}") return in_sync @@ -409,17 +453,19 @@ def targets( global watchlist if status() == False: + logger.debug("cache not current, syncing...") sync() else: load_conf() tmp_watch: SnykWatchList = load_watchlist(s.cache_dir) watchlist.repos = tmp_watch.repos + logger.debug(f"loaded cache... tmp_watch={pformat(watchlist.repos)}") # print(f"{watchlist=}") all_orgs = Orgs(cache=str(s.cache_dir), groups=s.snyk_groups) - all_orgs.load() + logger.debug(f"all_orgs={pformat(all_orgs)}") target_list = [] @@ -433,10 +479,14 @@ def targets( else: filtered_repos = filtered_repos - for r in filtered_repos: + logger.debug(f"filtered_repos={pformat(filtered_repos)}") + for r in filtered_repos: + logger.debug(f"processing repo={pformat(r)}") if r.needs_reimport(s.default_org, s.snyk_orgs) or force_refresh: + logger.debug(f"needs reimport") for branch in r.get_reimport(s.default_org, s.snyk_orgs): + logger.debug(f"processing branch={pformat(branch)}") if branch.project_count() == 0 or force_refresh: if force_default: @@ -446,8 +496,9 @@ def targets( org_id = branch.org_id int_id = branch.integrations["github-enterprise"] + logger.debug(f"org_id={org_id}, int_id={int_id}") source = r.source.get_target() - + logger.debug(f"source={pformat(source)}") source["branch"] = branch.name target = { @@ -458,11 +509,13 @@ def targets( target_list.append(target) - final_targets = list() + final_targets: List = list() + logger.debug(f"final_targets={pformat(final_targets)}") for group in s.snyk_groups: + logger.debug(f"processing group={pformat(group)}") orgs = all_orgs.get_orgs_by_group(group) - + logger.debug(f"orgs={pformat(orgs)}") o_ids = [str(o.id) for o in orgs] g_targets = {"name": group["name"], "targets": list()} @@ -472,6 +525,7 @@ def targets( final_targets.append(g_targets) if save_targets is True: + logger.debug(f"saving targets") typer.echo(f"Writing targets to {s.targets_dir}", err=True) if os.path.isdir(f"{s.targets_dir}") is not True: typer.echo(f"Creating directory to {s.targets_dir}", err=True) @@ -505,26 +559,28 @@ def tags( v1client = SnykClient(str(s.snyk_token), user_agent=f"pysnyk/snyk_services/sync/{__version__}", tries=1, delay=1) if status() == False: + logger.debug(f"cache not current, syncing") sync() else: + logger.debug(f"loading config") load_conf() tmp_watch = load_watchlist(s.cache_dir) - + logger.debug(f"tmp_watch={pformat(tmp_watch)}") watchlist.repos = tmp_watch.repos all_orgs = Orgs(cache=str(s.cache_dir), groups=s.snyk_groups) - all_orgs.load() + logger.debug(f"all_orgs={all_orgs}") needs_tags = list() for group in s.snyk_groups: - + logger.debug(f"processing group={pformat(group)}") group_tags = {"name": group["name"], "tags": list()} orgs = all_orgs.get_orgs_by_group(group) - + logger.debug(f"orgs={pformat(orgs)}") o_ids = [str(o.id) for o in orgs] group_tags["tags"] = watchlist.get_proj_tag_updates(o_ids) @@ -534,6 +590,7 @@ def tags( # now we iterate over needs_tags by group and save out a per group tag file for g_tags in needs_tags: + logger.debug(f"processing tags={pformat(g_tags)}") if g_tags["tags"]: if update_tags is True: typer.echo(f"Checking if {g_tags['name']} projects need tag updates", err=True) @@ -558,6 +615,7 @@ def tags( p_live = v1client.get(p_path).json() except SnykHTTPError as e: typer.echo(f"Error: retrieving project path: {p_path} error:\n{e}") + logger.exception(f"issue getting project path error={str(e)}") break tags_to_post = [t for t in p["tags"] if t not in p_live["tags"]] @@ -613,6 +671,7 @@ def autoconf( conf["default"] = dict() conf["default"]["orgName"] = snykorg conf["default"]["integrationName"] = "github-enterprise" + logger.debug(f"conf={pformat(conf)}") typer.echo(f"Generating configuration based on Snyk Org: {snykorg} and Github Org: {githuborg} ", err=True) @@ -655,6 +714,8 @@ def autoconf( if s.snyk_orgs_file.write_text(yaml.safe_dump(snyk_orgs)): typer.echo(f"Wrote Snyk Orgs data for the Group: {my_group_slug} to: {s.snyk_orgs_file.as_posix()}", err=True) + logger.debug(f"snyk_orgs={pformat(snyk_orgs)}") + def load_conf(): diff --git a/snyk_sync/utils.py b/snyk_sync/utils.py index e88251d..b402ab1 100644 --- a/snyk_sync/utils.py +++ b/snyk_sync/utils.py @@ -1,6 +1,8 @@ +import functools import json import logging from datetime import datetime +from enum import Enum from logging import exception from os import environ from os import path @@ -31,19 +33,49 @@ USER_AGENT = "pysnyk/snyk_services/snyk_sync" logger = logging.getLogger(__name__) -logging.basicConfig(filename="snyk_sync.log", filemode="w", encoding="utf-8") +FORMAT = "[%(filename)s:%(lineno)4s - %(funcName)s ] %(message)s" +logging.basicConfig(filename="snyk_sync.log", filemode="w", format=FORMAT, encoding="utf-8") +def set_log_level(log_level: str, set_root: bool = False): + logger.setLevel(logging.getLevelName(log_level)) + if set_root: + logging.root.setLevel(logging.getLevelName(log_level)) + + +def log(func): + if logger.level <= logging.DEBUG: + + @functools.wraps(func) + def log_wrapper(*args, **kwargs): + args_repr = [repr(a) for a in args] + kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()] + signature = ", ".join(args_repr + kwargs_repr) + logger.debug(f"function {func.__name__} called with args {signature}") + try: + result = func(*args, **kwargs) + return result + except Exception as e: + logger.exception(f"Exception raised in {func.__name__}. exception: {str(e)}") + raise e + + return log_wrapper + return func + + +@log def jprint(something): print(json.dumps(something, indent=2)) +@log def jopen(filename): with open(filename, "r") as the_file: data = the_file.read() return json.loads(data) +@log def jwrite(data, filename, minimize: bool = False): try: with open(filename, "w") as the_file: @@ -56,12 +88,14 @@ def jwrite(data, filename, minimize: bool = False): return False +@log def yopen(filename): with open(filename, "r") as the_file: data = the_file.read() return yaml.safe_load(data) +@log def newer(cached: str, remote: str) -> bool: # 2021-08-25T13:37:43Z @@ -73,6 +107,7 @@ def newer(cached: str, remote: str) -> bool: return bool(remote_ts < cache_ts) +@log def make_v3_get(endpoint, token): V3_API = "https://api.snyk.io/v3" USER_AGENT = "pysnyk/snyk_services/target_sync" @@ -85,11 +120,13 @@ def make_v3_get(endpoint, token): return client.get(url) +@log def v3_get(endpoint, token, delay=1): result = retry_call(make_v3_get, fkwargs={"endpoint": endpoint, "token": token}, tries=3, delay=delay) return result +@log def get_org_targets(org: dict, token: str) -> list: print(f"getting {org['id']} / {org['slug']} targets") @@ -102,6 +139,7 @@ def get_org_targets(org: dict, token: str) -> list: return targets +@log def get_org_projects(org: dict, token: str) -> dict: print(f"getting {org['id']} / {org['slug']} projects") @@ -132,6 +170,7 @@ def get_org_projects(org: dict, token: str) -> dict: return orgs_resp +@log def search_projects(base_name, origin, client, snyk_token, org_in: dict): org: Dict = dict() @@ -145,6 +184,7 @@ def search_projects(base_name, origin, client, snyk_token, org_in: dict): return json.loads(client.post(path, query).text) +@log def to_camel_case(snake_str): components = snake_str.split("_") # We capitalize the first letter of each component except the first one @@ -152,6 +192,7 @@ def to_camel_case(snake_str): return components[0] + "".join(x.title() for x in components[1:]) +@log def default_settings( name: Optional[str], value: str, default: Union[Any, Callable[[], Any], None], context: Context ) -> Settings: @@ -255,12 +296,14 @@ def default_settings( return default +@log def gen_path(parent_file: Path, child: str): the_path_string = f"{parent_file.parent}/{child}" return Path(the_path_string) +@log def ensure_dir(directory: Path) -> bool: if not directory.exists(): try: @@ -275,6 +318,7 @@ def ensure_dir(directory: Path) -> bool: return True +@log def load_watchlist(cache_dir: Path) -> SnykWatchList: tmp_watchlist = SnykWatchList() cache_data_errors = [] @@ -293,7 +337,10 @@ def load_watchlist(cache_dir: Path) -> SnykWatchList: try: tmp_watchlist.repos.append(Repo.parse_obj(repo)) except Exception as e: + logger.exception(f"error loading watchlist, error={str(e)}") + logger.exception(f"Error {repr(e)} attempting to parse import.yaml in repo {repo['url']}") cache_data_error_string = f"Error {repr(e)} attempting to parse import.yaml in repo {repo['url']}" + # print(f"{cache_data_error_string}") cache_data_errors.append(cache_data_error_string) @@ -305,6 +352,7 @@ def load_watchlist(cache_dir: Path) -> SnykWatchList: return tmp_watchlist +@log def update_client(old_client, token): old_client.api_token = token old_client.api_headers["Authorization"] = f"token {old_client.api_token}" @@ -313,12 +361,14 @@ def update_client(old_client, token): return old_client +@log def filter_chunk(chunk, exclude_list): return [y for y in chunk if y.repository.id not in exclude_list and y.name == "import.yaml"] # Function wrappers for GitHub API calls. Here we simply wrap the original call in a function which is decorated with # a "backoff". This will catch rate limit exceptions and automatically retry the function. +@log @backoff.on_exception(backoff.expo, RateLimitExceededException) def get_page_wrapper(pg_list: PaginatedList, page_number: int, show_rate_limit: bool = False): try: @@ -329,6 +379,7 @@ def get_page_wrapper(pg_list: PaginatedList, page_number: int, show_rate_limit: raise e +@log @backoff.on_exception(backoff.expo, RateLimitExceededException) def get_organization_wrapper(gh: Github, gh_org_name: str, show_rate_limit: bool = False): try: @@ -339,6 +390,7 @@ def get_organization_wrapper(gh: Github, gh_org_name: str, show_rate_limit: bool raise e +@log @backoff.on_exception(backoff.expo, RateLimitExceededException) def get_repos_wrapper(gh_org: Organization, type: str, sort: str, direction: str, show_rate_limit: bool = False): try: