Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Lambda timeouts in the Artemis Repo API #206

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions backend/lambdas/api/repo/repo/bitbucket_util/bitbucket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
SERVER_BRANCH_PAGE_LIMIT = 150
RATE_LIMIT_STATUS_CODE = 429
RATE_LIMIT_RESPONSE = "Rate limit for this resource has been exceeded"
server_branches = {}


class RepoQueueFields:
Expand Down Expand Up @@ -283,23 +282,24 @@ def verify_branch_exists_server(branch_url, branch_name, key) -> dict:
if not branch_name:
return {"status": True, "response": None}

if not server_branches.get(branch_url):
branch_result = get_server_branches(branch_url, key)
if not branch_result["status"]:
return branch_result
branch_result = get_server_branches(branch_url, key, branch_name)
if not branch_result["status"] or not branch_result.get("branches"):
return branch_result

branch_exists = branch_name in server_branches[branch_url]
branch_exists = branch_name in branch_result["branches"]
return {"status": branch_exists, "response": "Branch not found"}


def get_server_branches(branch_url, key) -> dict:
# Query the BitBucket API for the repo branches
def get_server_branches(branch_url, key, branch_name) -> dict:
# Query the BitBucket API for the repo branches that match the branch_name
is_last_page = False
start = 0
server_branches[branch_url] = []
branch_result = []
while not is_last_page:
# construct url with cursor
branch_url_with_cursor = f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}"
# construct url with cursor & branch_name
branch_url_with_cursor = (
f"{branch_url}?cursor={start}&limit={SERVER_BRANCH_PAGE_LIMIT}&filterText={branch_name}"
)
branch_response = query_bitbucket_api(branch_url_with_cursor, key)
if branch_response.status_code != 200:
log.warning("Branch url returned status code %d", branch_response.status_code)
Expand All @@ -312,11 +312,11 @@ def get_server_branches(branch_url, key) -> dict:
return {"status": False, "response": branch_response.text}

for branch in branch_dict.get("values"):
server_branches[branch_url].append(branch.get("displayId"))
branch_result.append(branch.get("displayId"))

is_last_page = branch_dict.get("isLastPage")
start = branch_dict.get("nextPageStart")
return {"status": True, "response": None}
return {"status": True, "response": None, "branches": branch_result}


def _query(
Expand Down
28 changes: 19 additions & 9 deletions orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@log.inject_lambda_context
def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 100) -> Optional[list[dict[str, Any]]]:
def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 20) -> Optional[list[dict[str, Any]]]:
# Get the size of the REPO_QUEUE
message_num = get_queue_size(REPO_QUEUE)
if message_num == 0:
Expand All @@ -40,7 +40,7 @@ def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int =

api_key = get_analyzer_api_key(API_KEY_LOC)

# Pull no more than 100 repos off the queue
# Pull no more than 20 repos off the queue
repos = []

while len(repos) < size:
Expand Down Expand Up @@ -85,6 +85,7 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:

requests_by_service = construct_repo_requests(repos)
for service, request_items in requests_by_service["reqs"].items():
log.append_keys(version_control_service=service)
log.info("Submitting %d repos for %s", len(request_items), service)

url = f"{analyzer_url}/{service}"
Expand All @@ -99,6 +100,9 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:
if success or response.status_code == 207:
repo_dict = get_repo_scan_items(service, response.text)
all_scan_items.extend(repo_dict["scan_items"])
if response.status_code == 504:
log.error("Artemis API Timed out")
requeue_failed_repos(service, requests_by_service["req_lookup"][service], request_items)
if "failed" in response_dict:
requeue_failed_repos(service, requests_by_service["req_lookup"][service], response_dict["failed"])
all_success.append({"service": service, "repos": repo_dict.get("repos"), "success": success})
Expand All @@ -108,29 +112,35 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:

def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos: list):
"""
Send failed repos to the repo-deadletter SQS queue
Send failed repos to the repo-dead-letter SQS queue
"""
log.info(f"Sending {len(failed_repos)} repos to the repo-deadletter Queue", version_control_service=service)

repos_to_queue = []
index = 0
count = 0
for failed_repo in failed_repos:
repo_info = repo_lookup.get(failed_repo.get("repo", ""))
error_msg = failed_repo.get("error", "")
if error_msg.startswith("Could not resolve to a Repository with the name"):
log.info("Skipping deleted branch")
continue
repo_info = repo_lookup.get(failed_repo.get("repo", ""), failed_repo)
repo_info["service"] = service
if not repo_info:
continue
repos_to_queue.append({"Id": str(index), "MessageBody": json.dumps(repo_info)})
index += 1
if index >= 10:
log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service)
if not send_sqs_message(REPO_DLQ, repos_to_queue):
log.error("There was an error queueing the repos, aborting.", version_control_service=service)
log.error("There was an error queueing the repos, aborting.")
return
count += index
index = 0
repos_to_queue = []
if index > 0:
log.debug("Sending %d repos to dead-letter queue", index, version_control_service=service)
count += index
send_sqs_message(REPO_DLQ, repos_to_queue)

log.info(f"Sending {count} repos to the repo-deadletter Queue")


def get_repo_scan_items(service, response, date=None):
repos = {}
Expand Down