Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always cleanup cloud indexes from tests #251

Merged
merged 20 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9356676
add always clause to index cleanup
vicilliar Sep 10, 2024
5c4bc71
install py-marqo locally in cleanup_indexes
vicilliar Sep 16, 2024
2bbf44f
use github run id as test run identifier
vicilliar Sep 17, 2024
daf77f3
change index identifier to last 4 digits of github run id
vicilliar Sep 17, 2024
efeb7e9
fix self call in delete indexes, stop cleanup from running with tests
vicilliar Sep 17, 2024
d4be16e
add concurrency check, timeout, extra index deletion logic
vicilliar Sep 18, 2024
df7c63f
add time import
vicilliar Sep 18, 2024
2b1be93
Merge branch 'mainline' into joshua/run-cleanup-always
vicilliar Sep 18, 2024
2fdb2e1
remove languagebind tests
vicilliar Sep 19, 2024
21f4abf
increase timeout hard cap
vicilliar Sep 19, 2024
3fab2d6
remove test create index with languagebind
vicilliar Sep 19, 2024
93f2599
lower workflow timeout
vicilliar Sep 19, 2024
59fa16d
[temp] comment out languagebind parameters
vicilliar Sep 20, 2024
f7a6910
change integ test run to cloud flag
vicilliar Sep 20, 2024
1fe86ec
remove readiness check in cloud tests, swap assertion in "in" statement
vicilliar Sep 20, 2024
299e9bf
capitalize the expected value for test
vicilliar Sep 20, 2024
3c37e54
increase timeout, make sure cleanup continues on failure
vicilliar Sep 20, 2024
3619646
Revert "[temp] comment out languagebind parameters"
vicilliar Sep 20, 2024
bef222c
Revert "remove test create index with languagebind"
vicilliar Sep 20, 2024
db9f7f4
Revert "remove languagebind tests"
vicilliar Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .github/workflows/cloud-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ on:
# allows other workflows to reuse these unit tests:
workflow_call:

concurrency:
group: cloud-integration-tests
cancel-in-progress: false

permissions:
contents: read

jobs:
integration_tests:
name: Cloud Integration Tests
timeout-minutes: 80 # Hard cap of 1 hour 20 mins for workflow
runs-on: ubuntu-latest
if: ${{ github.event.inputs.job_to_run == 'run_integration_tests' || github.event_name != 'workflow_dispatch' }}
environment: cloud-tests
Expand Down Expand Up @@ -53,12 +58,15 @@ jobs:
MARQO_URL: ${{ secrets.STAGING_CLOUD_MARQO_URL }}
MARQO_CLOUD_URL: ${{ secrets.STAGING_CLOUD_MARQO_URL }}
MARQO_API_KEY: ${{ secrets.STAGING_CLOUD_MARQO_API_KEY }}
MARQO_GITHUB_RUN_ID: ${{ github.run_id }}
run: tox -e cloud_tests -- create-indexes=True use-unique-identifier=True delete-indexes=True

cleanup_indexes:
name: Cleanup cloud indexes
runs-on : ubuntu-latest
if: ${{ github.event.inputs.job_to_run == 'delete_all_indexes' }}
needs: integration_tests
if: always() # Run this job regardless of success or failure of the integration_tests job
continue-on-error: true # Do not fail the workflow if this job fails
environment: cloud-tests

steps:
Expand All @@ -73,6 +81,9 @@ jobs:
python-version: "3.8"
cache: "pip"

- name: Install py-marqo locally
run: pip install -e . # Install the local py-marqo repo as an editable package

- name: Install dependencies
wanliAlex marked this conversation as resolved.
Show resolved Hide resolved
run: pip install -r requirements.txt

Expand All @@ -82,4 +93,5 @@ jobs:
MARQO_URL: ${{ secrets.STAGING_CLOUD_MARQO_URL }}
MARQO_CLOUD_URL: ${{ secrets.STAGING_CLOUD_MARQO_URL }}
MARQO_API_KEY: ${{ secrets.STAGING_CLOUD_MARQO_API_KEY }}
MARQO_GITHUB_RUN_ID: ${{ github.run_id }}
run: python tests/cloud_test_logic/delete_all_cloud_test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@


def set_unique_run_identifier():
"""
Set the unique run identifier for this test.
Use run ID from GitHub workflow, but only if environment variable MQ_TEST_RUN_IDENTIFIER is not set. Priority:
1. Manually set environment variable MQ_TEST_RUN_IDENTIFIER
2. GitHub run ID
3. Random 4-character identifier
"""
index_suffix = os.environ.get("MQ_TEST_RUN_IDENTIFIER", "")
if not index_suffix:
os.environ["MQ_TEST_RUN_IDENTIFIER"] = str(uuid.uuid4())[:4]
github_run_id = os.environ.get("MARQO_GITHUB_RUN_ID", None)
if github_run_id:
print(f"Found GitHub run ID: {github_run_id}. "
f"Using the last 4 characters: {github_run_id[-4:]} as the unique identifier.", flush=True)
os.environ["MQ_TEST_RUN_IDENTIFIER"] = github_run_id[-4:]
else:
random_identifier = str(uuid.uuid4())[:4]
print(f"No unique identifier found. Generating a random one: {random_identifier}.", flush=True)
os.environ["MQ_TEST_RUN_IDENTIFIER"] = random_identifier
63 changes: 58 additions & 5 deletions tests/cloud_test_logic/delete_all_cloud_test_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

import time
import marqo
from marqo.enums import IndexStatus
from marqo.errors import MarqoWebError
Expand All @@ -26,6 +27,27 @@ def fetch_marqo_index(client: marqo.Client, index_name: str):
"""A function to fetch a Marqo index by name with retries to handle transient network errors and Marqo API errors"""
return client.index(index_name)

def get_unique_run_identifier():
"""
Get the unique run identifier for this test.
Prioritize environment variable MQ_TEST_RUN_IDENTIFIER, then use run ID from GitHub workflow.
"""

index_suffix = os.environ.get("MQ_TEST_RUN_IDENTIFIER", None)
if index_suffix:
print(f"Using the environment variable MQ_TEST_RUN_IDENTIFIER: {index_suffix} as the unique identifier",
flush=True)
return index_suffix

github_run_id = os.environ.get("MARQO_GITHUB_RUN_ID", None)
if github_run_id:
print(f"Found GitHub run ID: {github_run_id}. "
f"Using the last 4 characters: {github_run_id[-4:]} as the unique identifier.", flush=True)
return github_run_id[-4:]

print("No unique identifier found. Please set the environment variable MQ_TEST_RUN_IDENTIFIER."
"Deleting all indexes with the correct prefixes.", flush=True)
return None

def delete_all_test_indices(wait_for_readiness=False):
""" Delete all test indices from Marqo Cloud Account that match the following criteria:
Expand All @@ -36,14 +58,15 @@ def delete_all_test_indices(wait_for_readiness=False):
local_marqo_settings = {
"url": os.environ.get("MARQO_URL", 'http://localhost:8882'),
}
suffix = os.environ.get("MQ_TEST_RUN_IDENTIFIER", None)
suffix = get_unique_run_identifier()
prefix = "pymarqo"
api_key = os.environ.get("MARQO_API_KEY", None)
if api_key:
local_marqo_settings["api_key"] = api_key
print(f"Deleting all test indices from Marqo Cloud Account that match the following criteria:")
print(f"- index name starts with '{prefix}'")
print(f"- index name contains the value of the environment variable MQ_TEST_RUN_IDENTIFIER: {suffix}\n")
print(f"- index name starts with '{prefix}' AND")
print(f"- index name ends with the suffix: {suffix}\n")

client = marqo.Client(**local_marqo_settings)
indexes = client.get_indexes()
indices_to_delete = []
Expand All @@ -60,6 +83,13 @@ def delete_all_test_indices(wait_for_readiness=False):

print("Indices to delete: ", indices_to_delete)
print("Marqo Cloud deletion responses:")

# First pass will either
# 1. If the index is READY, send it into DELETING
# 2. If the index is DELETED, do nothing
# 3. If the index is FAILED, send it into DELETING
# 4. If the index is CREATING, MODIFYING, DELETING, do nothing

for index_name in indices_to_delete:
index = fetch_marqo_index(client, index_name)
if index.get_status()["indexStatus"] == IndexStatus.READY:
Expand All @@ -70,22 +100,45 @@ def delete_all_test_indices(wait_for_readiness=False):
print(f"Index {index_name} has failed status, deleting anyway")
index.delete(wait_for_readiness=False)
else:
# Either CREATING, MODIFYING, DELETING.
print(f"Index {index_name} is not ready for deletion, status: {index.get_status()['indexStatus']}")

# All indexes now are either DELETING, CREATING, MODIFYING (might need future deletion)
if wait_for_readiness:
max_retries = 100
attempt = 0

while indices_to_delete:
print(f"Attempt #{attempt} at trying to delete indices: {indices_to_delete}", flush=True)
resp = fetch_marqo_indexes(client)
resp_json = resp.json()
all_index_names = [index["indexName"] for index in resp_json['results']]
for index_for_deletion_name in indices_to_delete:
# Index has successfully been DELETED
if index_for_deletion_name not in all_index_names:
print(f"Index {index_for_deletion_name} has been successfully deleted.")
indices_to_delete.remove(index_for_deletion_name)
else:
# Check if index has finally become READY or FAILED
# Kick off deletion again if so
index = fetch_marqo_index(client, index_for_deletion_name)
if index.get_status()["indexStatus"] == IndexStatus.READY or \
index.get_status()["indexStatus"] == IndexStatus.FAILED:
print(f"Index {index_for_deletion_name} has {index.get_status()['indexStatus']} status, "
f"sending a delete request.")
index.delete(wait_for_readiness=False)
else:
print(f"Index {index_for_deletion_name} still has status: {index.get_status()['indexStatus']}. "
f"Waiting for it to be READY, FAILED, or disappear from list.")

if attempt > max_retries:
raise RuntimeError("Timed out waiting for indices to be deleted, still remaining: "
f"{indices_to_delete}. Please delete manually")
print("All test indices deleted successfully")
attempt += 1
time.sleep(30)

print("All test indices deleted successfully", flush=True)


if __name__ == '__main__':
delete_all_test_indices()
delete_all_test_indices(wait_for_readiness=True)
40 changes: 32 additions & 8 deletions tests/cloud_test_logic/run_cloud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import signal
import sys

import concurrent.futures
import threading
import time

import pytest

from create_and_set_cloud_unique_run_identifier import set_unique_run_identifier
Expand All @@ -34,6 +38,27 @@ def convert_string_to_boolean(string_value):
if string_value.lower() in valid_representations_of_true:
return True

def run_pytest(pytest_args):
"""Function to run pytest suite"""
print("running pytest integration tests with args:", pytest_args)
return pytest.main(pytest_args)

def run_pytest_with_timeout():
TIMEOUT_SECONDS = 45 * 60 # 45 minute timeout (Full suite takes ~10 mins now 9/20/24)
pytest_args = ['tests/', '--cloud'] + sys.argv[1:]

# Use ThreadPoolExecutor to run pytest in a separate thread
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_pytest, pytest_args)

try:
# Wait for the pytest to complete or timeout
pytest_exit_code = future.result(timeout=TIMEOUT_SECONDS)
except concurrent.futures.TimeoutError:
print(f"Tests exceeded the {TIMEOUT_SECONDS // 60} minute timeout and were terminated.")
pytest_exit_code = 1 # Set an exit code indicating failure due to timeout

return pytest_exit_code

if __name__ == '__main__':
# Set up the signal handler for KeyboardInterrupt (Cmd+C)
Expand All @@ -55,20 +80,19 @@ def convert_string_to_boolean(string_value):
populate_indices()
except MarqoWebError as e:
print("Detected an error while creating indices, deleting all indices and exiting the workflow.")
delete_all_test_indices(wait_for_readiness=True)
delete_all_test_indices(wait_for_readiness=False)
sys.exit(1)
print(f"All indices has been created, proceeding to run tests with pytest. Arguments: {sys.argv[1:]}")
print(f"All indices have been created, proceeding to run tests with pytest. Arguments: {sys.argv[1:]}")

pytest_exit_code = run_pytest_with_timeout()

pytest_args = ['tests/', '--cloud'] + sys.argv[1:]
print("running integration tests with args:", pytest_args)
pytest_exit_code = pytest.main(pytest_args)
if pytest_exit_code != 0:
raise RuntimeError(f"Pytest failed with exit code: {pytest_exit_code}")
print("All tests has been executed successfully")
print("All tests have been executed successfully")
if tests_specific_kwargs['delete-indexes']:
delete_all_test_indices(wait_for_readiness=True)
delete_all_test_indices(wait_for_readiness=False)
except Exception as e:
print(f"Error: {e}")
if tests_specific_kwargs['delete-indexes']:
delete_all_test_indices(wait_for_readiness=True)
delete_all_test_indices(wait_for_readiness=False)
sys.exit(1)
4 changes: 2 additions & 2 deletions tests/v2_tests/test_create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ def test_cloud_index_attributes(self):
self.TEST_ATTRIBUTES_WITH_DEFAULTS[test_attribute]
)
if isinstance(expected_value, int):
self.assertEqual(expected_value, int(index_meta_data[test_attribute]))
self.assertEqual(int(index_meta_data[test_attribute]), expected_value)
elif isinstance(expected_value, str):
self.assertIn(expected_value, index_meta_data[test_attribute].upper())
self.assertIn(index_meta_data[test_attribute].upper(), expected_value.upper())
else:
raise ValueError(f"Unexpected type for {test_attribute}: {type(expected_value)}")
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ passenv =
MARQO_CLOUD_URL # URL that config uses to resolve whether cluster is cloud v2
MARQO_API_KEY # This is the API key used to authenticate with the cloud
MARQO_URL # URL that is used as marqo instance url
MARQO_GITHUB_RUN_ID # this is used to identify indexes between test runs.
MQ_TEST_RUN_IDENTIFIER # this is used to identify indexes between test runs. If blank it will be randomly generated
whitelist_externals =
python
Expand Down
Loading