Skip to content

Commit

Permalink
Handle Lambda timeouts in the Artemis Repo API (#206)
Browse files Browse the repository at this point in the history
* Filter branch query

* Skip deleted branches & requeue more failed repos
  • Loading branch information
Kamsiy authored Jul 22, 2024
1 parent d808b6f commit bdb2bed
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
26 changes: 13 additions & 13 deletions backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
SERVER_BRANCH_PAGE_LIMIT = 150
RATE_LIMIT_STATUS_CODE = 429
RATE_LIMIT_RESPONSE = "Rate limit for this resource has been exceeded"
server_branches = {}


class RepoQueueFields:
Expand Down Expand Up @@ -283,23 +282,24 @@ def verify_branch_exists_server(branch_url, branch_name, key) -> dict:
if not branch_name:
return {"status": True, "response": None}

if not server_branches.get(branch_url):
branch_result = get_server_branches(branch_url, key)
if not branch_result["status"]:
return branch_result
branch_result = get_server_branches(branch_url, key, branch_name)
if not branch_result["status"] or not branch_result.get("branches"):
return branch_result

branch_exists = branch_name in server_branches[branch_url]
branch_exists = branch_name in branch_result["branches"]
return {"status": branch_exists, "response": "Branch not found"}


def get_server_branches(branch_url, key) -> dict:
# Query the BitBucket API for the repo branches
def get_server_branches(branch_url, key, branch_name) -> dict:
# Query the BitBucket API for the repo branches that match the branch_name
is_last_page = False
start = 0
server_branches[branch_url] = []
branch_result = []
while not is_last_page:
# construct url with cursor
branch_url_with_cursor = f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}"
# construct url with cursor & branch_name
branch_url_with_cursor = (
f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}&filterText={branch_name}"
)
branch_response = query_bitbucket_api(branch_url_with_cursor, key)
if branch_response.status_code != 200:
log.warning("Branch url returned status code %d", branch_response.status_code)
Expand All @@ -312,11 +312,11 @@ def get_server_branches(branch_url, key) -> dict:
return {"status": False, "response": branch_response.text}

for branch in branch_dict.get("values"):
server_branches[branch_url].append(branch.get("displayId"))
branch_result.append(branch.get("displayId"))

is_last_page = branch_dict.get("isLastPage")
start = branch_dict.get("nextPageStart")
return {"status": True, "response": None}
return {"status": True, "response": None, "branches": branch_result}


def _query(
Expand Down
28 changes: 19 additions & 9 deletions orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@log.inject_lambda_context
def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 100) -> Optional[list[dict[str, Any]]]:
def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 20) -> Optional[list[dict[str, Any]]]:
# Get the size of the REPO_QUEUE
message_num = get_queue_size(REPO_QUEUE)
if message_num == 0:
Expand All @@ -40,7 +40,7 @@ def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int =

api_key = get_analyzer_api_key(API_KEY_LOC)

# Pull no more than 100 repos off the queue
# Pull no more than 20 repos off the queue
repos = []

while len(repos) < size:
Expand Down Expand Up @@ -85,6 +85,7 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:

requests_by_service = construct_repo_requests(repos)
for service, request_items in requests_by_service["reqs"].items():
log.append_keys(version_control_service=service)
log.info("Submitting %d repos for %s", len(request_items), service)

url = f"{analyzer_url}/{service}"
Expand All @@ -99,6 +100,9 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:
if success or response.status_code == 207:
repo_dict = get_repo_scan_items(service, response.text)
all_scan_items.extend(repo_dict["scan_items"])
if response.status_code == 504:
log.error("Artemis API Timed out")
requeue_failed_repos(service, requests_by_service["req_lookup"][service], request_items)
if "failed" in response_dict:
requeue_failed_repos(service, requests_by_service["req_lookup"][service], response_dict["failed"])
all_success.append({"service": service, "repos": repo_dict.get("repos"), "success": success})
Expand All @@ -108,29 +112,35 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:

def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos: list):
"""
Send failed repos to the repo-deadletter SQS queue
Send failed repos to the repo-dead-letter SQS queue
"""
log.info(f"Sending {len(failed_repos)} repos to the repo-deadletter Queue", version_control_service=service)

repos_to_queue = []
index = 0
count = 0
for failed_repo in failed_repos:
repo_info = repo_lookup.get(failed_repo.get("repo", ""))
error_msg = failed_repo.get("error", "")
if error_msg.startswith("Could not resolve to a Repository with the name"):
log.info("Skipping deleted branch")
continue
repo_info = repo_lookup.get(failed_repo.get("repo", ""), failed_repo)
repo_info["service"] = service
if not repo_info:
continue
repos_to_queue.append({"Id": str(index), "MessageBody": json.dumps(repo_info)})
index += 1
if index >= 10:
log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service)
if not send_sqs_message(REPO_DLQ, repos_to_queue):
log.error("There was an error queueing the repos, aborting.", version_control_service=service)
log.error("There was an error queueing the repos, aborting.")
return
count += index
index = 0
repos_to_queue = []
if index > 0:
log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service)
count += index
send_sqs_message(REPO_DLQ, repos_to_queue)

log.info(f"Sending {count} repos to the repo-deadletter Queue")


def get_repo_scan_items(service, response, date=None):
repos = {}
Expand Down

0 comments on commit bdb2bed

Please sign in to comment.