From bdb2bed3baaaa0acf91dc674f7519c4c0adc56a2 Mon Sep 17 00:00:00 2001 From: Kamsi Ibeziako Date: Mon, 22 Jul 2024 10:11:33 -0400 Subject: [PATCH] Handle Lambda timeouts in the Artemis Repo API (#206) * Filter branch query * Skip deleted branches & requeue more failed repos --- .../repo/bitbucket_util/bitbucket_utils.py | 26 ++++++++--------- .../lambdas/repo_scan/repo_scan/repo_scan.py | 28 +++++++++++++------ 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py b/backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py index e936f324..59820d23 100644 --- a/backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py +++ b/backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py @@ -28,7 +28,6 @@ SERVER_BRANCH_PAGE_LIMIT = 150 RATE_LIMIT_STATUS_CODE = 429 RATE_LIMIT_RESPONSE = "Rate limit for this resource has been exceeded" -server_branches = {} class RepoQueueFields: @@ -283,23 +282,24 @@ def verify_branch_exists_server(branch_url, branch_name, key) -> dict: if not branch_name: return {"status": True, "response": None} - if not server_branches.get(branch_url): - branch_result = get_server_branches(branch_url, key) - if not branch_result["status"]: - return branch_result + branch_result = get_server_branches(branch_url, key, branch_name) + if not branch_result["status"] or not branch_result.get("branches"): + return branch_result - branch_exists = branch_name in server_branches[branch_url] + branch_exists = branch_name in branch_result["branches"] return {"status": branch_exists, "response": "Branch not found"} -def get_server_branches(branch_url, key) -> dict: - # Query the BitBucket API for the repo branches +def get_server_branches(branch_url, key, branch_name) -> dict: + # Query the BitBucket API for the repo branches that match the branch_name is_last_page = False start = 0 - server_branches[branch_url] = [] + branch_result = [] while not is_last_page: - # construct url with cursor - branch_url_with_cursor = f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}" + # construct url with cursor & branch_name + branch_url_with_cursor = ( + f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}&filterText={branch_name}" + ) branch_response = query_bitbucket_api(branch_url_with_cursor, key) if branch_response.status_code != 200: log.warning("Branch url returned status code %d", branch_response.status_code) @@ -312,11 +312,11 @@ def get_server_branches(branch_url, key) -> dict: return {"status": False, "response": branch_response.text} for branch in branch_dict.get("values"): - server_branches[branch_url].append(branch.get("displayId")) + branch_result.append(branch.get("displayId")) is_last_page = branch_dict.get("isLastPage") start = branch_dict.get("nextPageStart") - return {"status": True, "response": None} + return {"status": True, "response": None, "branches": branch_result} def _query( diff --git a/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py b/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py index bfda5473..f783fac2 100644 --- a/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py +++ b/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py @@ -31,7 +31,7 @@ @log.inject_lambda_context -def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 100) -> Optional[list[dict[str, Any]]]: +def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 20) -> Optional[list[dict[str, Any]]]: # Get the size of the REPO_QUEUE message_num = get_queue_size(REPO_QUEUE) if message_num == 0: @@ -40,7 +40,7 @@ def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = api_key = get_analyzer_api_key(API_KEY_LOC) - # Pull no more than 100 repos off the queue + # Pull no more than 20 repos off the queue repos = [] while len(repos) < size: @@ -85,6 +85,7 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list: requests_by_service = construct_repo_requests(repos) for service, request_items in requests_by_service["reqs"].items(): + log.append_keys(version_control_service=service) log.info("Submitting %d repos for %s", len(request_items), service) url = f"{analyzer_url}/{service}" @@ -99,6 +100,9 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list: if success or response.status_code == 207: repo_dict = get_repo_scan_items(service, response.text) all_scan_items.extend(repo_dict["scan_items"]) + if response.status_code == 504: + log.error("Artemis API Timed out") + requeue_failed_repos(service, requests_by_service["req_lookup"][service], request_items) if "failed" in response_dict: requeue_failed_repos(service, requests_by_service["req_lookup"][service], response_dict["failed"]) all_success.append({"service": service, "repos": repo_dict.get("repos"), "success": success}) @@ -108,29 +112,35 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list: def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos: list): """ - Send failed repos to the repo-deadletter SQS queue + Send failed repos to the repo-dead-letter SQS queue """ - log.info(f"Sending {len(failed_repos)} repos to the repo-deadletter Queue", version_control_service=service) - repos_to_queue = [] index = 0 + count = 0 for failed_repo in failed_repos: - repo_info = repo_lookup.get(failed_repo.get("repo", "")) + error_msg = failed_repo.get("error", "") + if error_msg.startswith("Could not resolve to a Repository with the name"): + log.info("Skipping deleted branch") + continue + repo_info = repo_lookup.get(failed_repo.get("repo", ""), failed_repo) + repo_info["service"] = service if not repo_info: continue repos_to_queue.append({"Id": str(index), "MessageBody": json.dumps(repo_info)}) index += 1 if index >= 10: - log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service) if not send_sqs_message(REPO_DLQ, repos_to_queue): - log.error("There was an error queueing the repos, aborting.", version_control_service=service) + log.error("There was an error queueing the repos, aborting.") return + count += index index = 0 repos_to_queue = [] if index > 0: - log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service) + count += index send_sqs_message(REPO_DLQ, repos_to_queue) + log.info(f"Sending {count} repos to the repo-deadletter Queue") + def get_repo_scan_items(service, response, date=None): repos = {}