From 1e75d95aa1e3f9f4ae3ae16bbfc6eebddca57743 Mon Sep 17 00:00:00 2001 From: Andrew Prudhomme Date: Mon, 30 Sep 2024 11:40:44 -0700 Subject: [PATCH] Add cleanupData command to nrt_utils (#733) --- build.gradle | 2 + .../nrtsearch/server/remote/s3/S3Backend.java | 2 +- .../tools/nrt_utils/NrtUtilsCommand.java | 2 + .../nrt_utils/backup/BackupCommandUtils.java | 4 +- .../nrt_utils/cleanup/CleanupDataCommand.java | 436 +++++++++++++++ .../cleanup/CleanupDataCommandTest.java | 516 ++++++++++++++++++ 6 files changed, 959 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommand.java create mode 100644 src/test/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommandTest.java diff --git a/build.gradle b/build.gradle index 4deb5d7ad..92a69167d 100644 --- a/build.gradle +++ b/build.gradle @@ -169,11 +169,13 @@ test { environment(Map.of('CUSTOM_HOST', 'my_custom_host', 'VAR1', 'v1', 'VAR2', 'v2', 'VAR3', 'v3')) if (project.hasProperty('longRunningTestsOnly')) { include '**/IncrementalDataCleanupCommandTest.class' + include '**/CleanupDataCommandTest.class' } else { if (!project.hasProperty('includePerfTests')) { exclude '**/YelpReviewsTest.class' exclude '**/MergeBehaviorTests.class' exclude '**/IncrementalDataCleanupCommandTest.class' + exclude '**/CleanupDataCommandTest.class' filter { excludeTestsMatching '*.NodeNameResolverAndLoadBalancingTests.testSimpleLoadBalancingAsync' } diff --git a/src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java b/src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java index 1f7d1f9b0..f2b22c86e 100644 --- a/src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java +++ b/src/main/java/com/yelp/nrtsearch/server/remote/s3/S3Backend.java @@ -68,7 +68,7 @@ public class S3Backend implements RemoteBackend { static final String POINT_STATE = "point_state"; static final String DATA = "data"; static final String WARMING = "warming"; - static final String CURRENT_VERSION = "_current"; + public static final String CURRENT_VERSION = "_current"; private static final Logger logger = LoggerFactory.getLogger(S3Backend.class); private static final String ZIP_EXTENSION = ".zip"; diff --git a/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/NrtUtilsCommand.java b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/NrtUtilsCommand.java index 1dd5f0caa..ddee5c3ed 100644 --- a/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/NrtUtilsCommand.java +++ b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/NrtUtilsCommand.java @@ -19,6 +19,7 @@ import com.yelp.nrtsearch.tools.nrt_utils.backup.ListSnapshotsCommand; import com.yelp.nrtsearch.tools.nrt_utils.backup.RestoreCommand; import com.yelp.nrtsearch.tools.nrt_utils.backup.SnapshotCommand; +import com.yelp.nrtsearch.tools.nrt_utils.cleanup.CleanupDataCommand; import com.yelp.nrtsearch.tools.nrt_utils.legacy.incremental.DeleteIncrementalSnapshotsCommand; import com.yelp.nrtsearch.tools.nrt_utils.legacy.incremental.IncrementalDataCleanupCommand; import com.yelp.nrtsearch.tools.nrt_utils.legacy.incremental.ListIncrementalSnapshotsCommand; @@ -36,6 +37,7 @@ name = "nrt_utils", synopsisSubcommandLabel = "COMMAND", subcommands = { + CleanupDataCommand.class, CleanupSnapshotsCommand.class, DeleteIncrementalSnapshotsCommand.class, GetRemoteStateCommand.class, diff --git a/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/BackupCommandUtils.java b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/BackupCommandUtils.java index 8a3c60c0c..cc00a674b 100644 --- a/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/BackupCommandUtils.java +++ b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/backup/BackupCommandUtils.java @@ -92,7 +92,7 @@ public static String getSnapshotIndexMetadataPrefix(String snapshotRoot, String * @param bucketName s3 bucket * @param keys keys to delete */ - static void deleteObjects(AmazonS3 s3Client, String bucketName, List keys) { + public static void deleteObjects(AmazonS3 s3Client, String bucketName, List keys) { System.out.println("Batch deleting objects, size: " + keys.size()); DeleteObjectsRequest multiObjectDeleteRequest = new DeleteObjectsRequest(bucketName).withKeys(keys.toArray(new String[0])).withQuiet(true); @@ -107,7 +107,7 @@ static void deleteObjects(AmazonS3 s3Client, String bucketName, List key * @param interval interval string * @return interval in ms */ - static long getTimeIntervalMs(String interval) { + public static long getTimeIntervalMs(String interval) { String trimmed = interval.trim(); if (trimmed.length() < 2) { throw new IllegalArgumentException("Invalid time interval: " + trimmed); diff --git a/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommand.java b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommand.java new file mode 100644 index 000000000..ffcf26d68 --- /dev/null +++ b/src/main/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommand.java @@ -0,0 +1,436 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.tools.nrt_utils.cleanup; + +import static com.yelp.nrtsearch.tools.nrt_utils.backup.BackupCommandUtils.deleteObjects; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtPointState; +import com.yelp.nrtsearch.server.remote.RemoteBackend; +import com.yelp.nrtsearch.server.remote.RemoteUtils; +import com.yelp.nrtsearch.server.remote.s3.S3Backend; +import com.yelp.nrtsearch.server.utils.TimeStringUtil; +import com.yelp.nrtsearch.tools.nrt_utils.backup.BackupCommandUtils; +import com.yelp.nrtsearch.tools.nrt_utils.state.StateCommandUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; +import picocli.CommandLine; + +@CommandLine.Command( + name = CleanupDataCommand.CLEANUP_DATA, + description = "Cleanup unneeded index data in S3") +public class CleanupDataCommand implements Callable { + public static final String CLEANUP_DATA = "cleanupData"; + private static final int DELETE_BATCH_SIZE = 1000; + + @CommandLine.Option( + names = {"-s", "--serviceName"}, + description = "Name of nrtsearch cluster", + required = true) + private String serviceName; + + @CommandLine.Option( + names = {"-i", "--indexName"}, + description = "Name of cluster index", + required = true) + private String indexName; + + @CommandLine.Option( + names = {"--exactResourceName"}, + description = "If index resource name already has unique identifier") + private boolean exactResourceName; + + @CommandLine.Option( + names = {"-b", "--bucketName"}, + description = "Name of bucket containing state files", + required = true) + private String bucketName; + + @CommandLine.Option( + names = {"--region"}, + description = "AWS region name, such as us-west-1, us-west-2, us-east-1") + private String region; + + @CommandLine.Option( + names = {"-c", "--credsFile"}, + description = + "File holding AWS credentials; Will use DefaultCredentialProvider if this is unset.") + private String credsFile; + + @CommandLine.Option( + names = {"-p", "--credsProfile"}, + description = "Profile to use from creds file; Neglected when credsFile is unset.", + defaultValue = "default") + private String credsProfile; + + @CommandLine.Option( + names = {"-d", "--deleteAfter"}, + description = + "Delete unneeded files older than this, in the form <#> " + + "with valid units (s)econds, (m)inutes, (h)ours, (d)ays. (60m, 7h, 3d, etc.)", + required = true) + private String deleteAfter; + + @CommandLine.Option( + names = {"--gracePeriod"}, + description = + "Keep files within this grace period from the oldest index version creation, in the form <#> " + + "with valid units (s)econds, (m)inutes, (h)ours, (d)ays. (60m, 7h, 3d, etc.) default: ${DEFAULT-VALUE}", + defaultValue = "3h") + private String gracePeriod; + + @CommandLine.Option( + names = {"--dryRun"}, + description = "Print file deletions, instead of applying to S3") + private boolean dryRun; + + @CommandLine.Option( + names = {"--maxRetry"}, + description = "Maximum number of retry attempts for S3 failed requests", + defaultValue = "20") + private int maxRetry; + + private AmazonS3 s3Client; + + @VisibleForTesting + void setS3Client(AmazonS3 s3Client) { + this.s3Client = s3Client; + } + + @Override + public Integer call() throws Exception { + long deleteAfterMs = BackupCommandUtils.getTimeIntervalMs(deleteAfter); + long gracePeriodMs = BackupCommandUtils.getTimeIntervalMs(gracePeriod); + + if (s3Client == null) { + s3Client = + StateCommandUtils.createS3Client(bucketName, region, credsFile, credsProfile, maxRetry); + } + S3Backend s3Backend = new S3Backend(bucketName, false, s3Client); + + String resolvedIndexResource = + StateCommandUtils.getResourceName(s3Backend, serviceName, indexName, exactResourceName); + + // Cleanup point state files + String pointStatePrefix = + S3Backend.getIndexResourcePrefix( + serviceName, resolvedIndexResource, RemoteBackend.IndexResourceType.POINT_STATE); + if (!s3Backend.exists( + serviceName, resolvedIndexResource, RemoteBackend.IndexResourceType.POINT_STATE)) { + System.out.println("No data found for index: " + indexName); + return 1; + } + String currentPointStateVersion = s3Backend.getCurrentResourceName(pointStatePrefix); + long currentPointStateTimestampMs = validatePSNameAndGetTimestampMs(currentPointStateVersion); + System.out.println( + "Current index point state version: " + + currentPointStateVersion + + ", timestamp: " + + currentPointStateTimestampMs); + + long currentTimeMs = System.currentTimeMillis(); + long minPointStateTimestampMs = currentTimeMs - deleteAfterMs; + minPointStateTimestampMs = Math.min(minPointStateTimestampMs, currentPointStateTimestampMs); + System.out.println( + "Cleaning up version files, minPointStateTimestampMs: " + minPointStateTimestampMs); + + PointStateDeletionDecider pointStateDeletionDecider = + new PointStateDeletionDecider(minPointStateTimestampMs); + cleanupS3Files(s3Client, bucketName, pointStatePrefix, pointStateDeletionDecider, dryRun); + + String oldestRetainedPointFile = pointStateDeletionDecider.getOldestRetainedFile(); + System.out.println("Oldest point state version retained: " + oldestRetainedPointFile); + if (oldestRetainedPointFile == null) { + System.out.println("Could not determine oldest retained point state file"); + return 1; + } + + // find the min of current time, current point state time, and lowest point state + // time. This conservatively determines the lower bounds, in case there is an issue with + // one of the timestamps + long lowestVersionTimestampMs = validatePSNameAndGetTimestampMs(oldestRetainedPointFile); + // subtract grace period for safety, this will be more important when there is pre copied + // merge data in S3 + long dataMinTimestampMs = + Math.min(Math.min(currentPointStateTimestampMs, currentTimeMs), lowestVersionTimestampMs) + - gracePeriodMs; + + // get all the S3 files referenced by the first and last retained point state versions + byte[] currentPointStateData = + s3Client + .getObject(bucketName, pointStatePrefix + currentPointStateVersion) + .getObjectContent() + .readAllBytes(); + NrtPointState currentPointState = RemoteUtils.pointStateFromUtf8(currentPointStateData); + byte[] oldestPointStateData = + s3Client + .getObject(bucketName, pointStatePrefix + oldestRetainedPointFile) + .getObjectContent() + .readAllBytes(); + NrtPointState oldestPointState = RemoteUtils.pointStateFromUtf8(oldestPointStateData); + + Set currentPointStateFiles = getPointStateFiles(currentPointState); + Set oldestPointStateFiles = getPointStateFiles(oldestPointState); + + // Cleanup index data files + String dataPrefix = S3Backend.getIndexDataPrefix(serviceName, resolvedIndexResource); + // uses union of current version and lowest version, this is done to conservatively + // protect the lowest version index files to ensure it can be restored + Set activeIndexFiles = Sets.union(currentPointStateFiles, oldestPointStateFiles); + System.out.println( + "Cleaning up index data files, minTimestampMs: " + + dataMinTimestampMs + + ", activeIndexFiles: " + + activeIndexFiles); + + // clean up all data files that are not needed by the retained index versions + cleanupS3Files( + s3Client, + bucketName, + dataPrefix, + new IndexDataDeletionDecider(dataMinTimestampMs, activeIndexFiles), + dryRun); + return 0; + } + + /** + * Validate that the point state file name conforms to the expected format and return the + * timestamp component in milliseconds. + * + * @param pointStateFileName point state file name + * @return timestamp in milliseconds + */ + @VisibleForTesting + static long validatePSNameAndGetTimestampMs(String pointStateFileName) { + String[] parts = pointStateFileName.split("-"); + if (parts.length != 7) { + throw new IllegalArgumentException("Invalid point state name: " + pointStateFileName); + } + try { + // parse file name components to validate + String uuidString = String.join("-", parts[1], parts[2], parts[3], parts[4], parts[5]); + UUID.fromString(uuidString); + Long.valueOf(parts[6]); + String timeString = pointStateFileName.split("-")[0]; + return TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid point state name: " + pointStateFileName, e); + } + } + + /** + * Validate that the data file name conforms to the expected format and return the timestamp + * component in milliseconds. + * + * @param dataFileName data file name + * @return timestamp in milliseconds + */ + @VisibleForTesting + static long validateDataNameAndGetTimestampMs(String dataFileName) { + String[] parts = dataFileName.split("-"); + // lucene index files start with '_' + if (parts.length != 7 || !parts[6].startsWith("_")) { + throw new IllegalArgumentException("Invalid data name: " + dataFileName); + } + try { + // parse file name components to validate + String uuidString = String.join("-", parts[1], parts[2], parts[3], parts[4], parts[5]); + UUID.fromString(uuidString); + String timeString = dataFileName.split("-")[0]; + return TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid data name: " + dataFileName, e); + } + } + + /** + * Get the set of S3 index file names referenced by the point state. + * + * @param pointState point state + * @return set of S3 index file names + */ + private static Set getPointStateFiles(NrtPointState pointState) { + return pointState.files.entrySet().stream() + .map(e -> S3Backend.getIndexBackendFileName(e.getKey(), e.getValue())) + .collect(Collectors.toSet()); + } + + /** Decider for deleting index data files based on the min timestamp and active index files. */ + static class IndexDataDeletionDecider implements FileDeletionDecider { + private final long minTimestampMs; + private final Set activeIndexFiles; + private boolean done = false; + + IndexDataDeletionDecider(long minTimestampMs, Set activeIndexFiles) { + this.minTimestampMs = minTimestampMs; + this.activeIndexFiles = activeIndexFiles; + } + + @Override + public boolean shouldDelete(String fileBaseName, long timestampMs) { + if (activeIndexFiles.contains(fileBaseName)) { + return false; + } + long fileNameTimestampMs = validateDataNameAndGetTimestampMs(fileBaseName); + // once we hit the first time string that is newer than the min timestamp, we are done + // since the files are sorted by time string + if (fileNameTimestampMs >= minTimestampMs) { + done = true; + return false; + } + // extra sanity check to only delete files if both the file time string and the s3 timestamp + // are older than the min timestamp + return timestampMs < minTimestampMs; + } + + @Override + public boolean isDone() { + return done; + } + } + + /** + * Decider for deleting point state files based on the min timestamp. This decider will retain the + * first file that is newer than the min timestamp. + */ + static class PointStateDeletionDecider implements FileDeletionDecider { + private final long minPointStateTimestampMs; + private String oldestRetainedFile = null; + private boolean done = false; + + PointStateDeletionDecider(long minPointStateTimestampMs) { + this.minPointStateTimestampMs = minPointStateTimestampMs; + } + + @Override + public boolean shouldDelete(String fileBaseName, long timestampMs) { + if (S3Backend.CURRENT_VERSION.equals(fileBaseName)) { + return false; + } + long fileNameTimestampMs = validatePSNameAndGetTimestampMs(fileBaseName); + // once we hit the first time string that is newer than the min timestamp, we are done + // since the files are sorted by time string + if (fileNameTimestampMs >= minPointStateTimestampMs) { + oldestRetainedFile = fileBaseName; + done = true; + return false; + } + // extra sanity check to only delete files if both the file time string and the s3 timestamp + // are older than the min timestamp + return timestampMs < minPointStateTimestampMs; + } + + @Override + public boolean isDone() { + return done; + } + + public String getOldestRetainedFile() { + return oldestRetainedFile; + } + } + + /** + * Interface for deciding if an object should be deleted from s3 given its base name and + * modification time. + */ + interface FileDeletionDecider { + + /** + * Determine if an object should be deleted + * + * @param fileBaseName file name after the key prefix + * @param timestampMs modification timestamp + * @return if object should be deleted + */ + boolean shouldDelete(String fileBaseName, long timestampMs); + + /** + * Determine if the deletion process is done. This can be used to short circuit the cleanup + * process when all subsequent files will be retained. + * + * @return if the deletion process is done + */ + boolean isDone(); + } + + /** + * Clean up files in s3. Checks all keys matching the given prefix, and uses the given {@link + * FileDeletionDecider} to determine if they should be deleted. + * + * @param s3Client s3 client + * @param bucketName s3 bucket name + * @param keyPrefix key prefix to clean up + * @param deletionDecider deletion decider + * @param dryRun skip sending actual deletion requests to s3 + */ + static void cleanupS3Files( + AmazonS3 s3Client, + String bucketName, + String keyPrefix, + FileDeletionDecider deletionDecider, + boolean dryRun) { + ListObjectsV2Request req = + new ListObjectsV2Request().withBucketName(bucketName).withPrefix(keyPrefix); + ListObjectsV2Result result; + + List deleteList = new ArrayList<>(DELETE_BATCH_SIZE); + + do { + result = s3Client.listObjectsV2(req); + + for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { + String objFileName = objectSummary.getKey().split(keyPrefix)[1]; + long versionTimestampMs = objectSummary.getLastModified().getTime(); + if (deletionDecider.shouldDelete(objFileName, versionTimestampMs)) { + System.out.println( + "Deleting object - key: " + + objectSummary.getKey() + + ", timestampMs: " + + versionTimestampMs); + deleteList.add(objectSummary.getKey()); + if (deleteList.size() == DELETE_BATCH_SIZE) { + if (!dryRun) { + deleteObjects(s3Client, bucketName, deleteList); + } + deleteList.clear(); + } + } + if (deletionDecider.isDone()) { + break; + } + } + if (deletionDecider.isDone()) { + break; + } + String token = result.getNextContinuationToken(); + req.setContinuationToken(token); + } while (result.isTruncated()); + + if (!deleteList.isEmpty() && !dryRun) { + deleteObjects(s3Client, bucketName, deleteList); + } + } +} diff --git a/src/test/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommandTest.java b/src/test/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommandTest.java new file mode 100644 index 000000000..0bb4d6e5d --- /dev/null +++ b/src/test/java/com/yelp/nrtsearch/tools/nrt_utils/cleanup/CleanupDataCommandTest.java @@ -0,0 +1,516 @@ +/* + * Copyright 2024 Yelp Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yelp.nrtsearch.tools.nrt_utils.cleanup; + +import static com.yelp.nrtsearch.server.grpc.TestServer.S3_ENDPOINT; +import static com.yelp.nrtsearch.server.grpc.TestServer.SERVICE_NAME; +import static com.yelp.nrtsearch.server.grpc.TestServer.TEST_BUCKET; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.yelp.nrtsearch.server.config.IndexStartConfig.IndexDataLocationType; +import com.yelp.nrtsearch.server.grpc.Mode; +import com.yelp.nrtsearch.server.grpc.TestServer; +import com.yelp.nrtsearch.server.luceneserver.nrt.state.NrtPointState; +import com.yelp.nrtsearch.server.remote.RemoteBackend; +import com.yelp.nrtsearch.server.remote.RemoteUtils; +import com.yelp.nrtsearch.server.remote.s3.S3Backend; +import com.yelp.nrtsearch.server.utils.TimeStringUtil; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import picocli.CommandLine; + +public class CleanupDataCommandTest { + @Rule public final TemporaryFolder folder = new TemporaryFolder(); + + @After + public void cleanup() { + TestServer.cleanupAll(); + } + + private AmazonS3 getS3() { + AmazonS3 s3 = new AmazonS3Client(new AnonymousAWSCredentials()); + s3.setEndpoint(S3_ENDPOINT); + s3.createBucket(TEST_BUCKET); + return s3; + } + + private CommandLine getInjectedCommand() { + CleanupDataCommand command = new CleanupDataCommand(); + command.setS3Client(getS3()); + return new CommandLine(command); + } + + private TestServer getTestServer() throws IOException { + return TestServer.builder(folder) + .withAutoStartConfig(true, Mode.PRIMARY, 0, IndexDataLocationType.REMOTE) + .withRemoteStateBackend(false) + .build(); + } + + private List initIndex(TestServer server, S3Backend s3Backend) throws IOException { + List versions = new ArrayList<>(); + server.createSimpleIndex("test_index"); + server.startPrimaryIndex("test_index", -1, null); + String prefix = + S3Backend.getIndexResourcePrefix( + SERVICE_NAME, + server.getGlobalState().getDataResourceForIndex("test_index"), + RemoteBackend.IndexResourceType.POINT_STATE); + versions.add(s3Backend.getCurrentResourceName(prefix)); + for (int i = 0; i < 5; ++i) { + server.addSimpleDocs("test_index", i * 2, i * 2 + 1); + server.refresh("test_index"); + server.commit("test_index"); + versions.add(s3Backend.getCurrentResourceName(prefix)); + try { + Thread.sleep(5000); + } catch (InterruptedException ignore) { + } + } + return versions; + } + + private void setCurrentVersion(S3Backend s3Backend, TestServer server, String currentVersion) { + String indexResource = server.getGlobalState().getDataResourceForIndex("test_index"); + String versionPrefix = + S3Backend.getIndexResourcePrefix( + SERVICE_NAME, indexResource, RemoteBackend.IndexResourceType.POINT_STATE); + s3Backend.setCurrentResource(versionPrefix, currentVersion); + } + + @Test + public void testKeepsRecentVersions() throws IOException { + TestServer server = getTestServer(); + S3Backend s3Backend = new S3Backend(TEST_BUCKET, false, getS3()); + List versions = initIndex(server, s3Backend); + CommandLine cmd = getInjectedCommand(); + + int exitCode = + cmd.execute( + "--serviceName=" + SERVICE_NAME, + "--indexName=test_index", + "--bucketName=" + TEST_BUCKET, + "--deleteAfter=1h", + "--gracePeriod=1s"); + assertEquals(0, exitCode); + + assertVersions( + server.getGlobalState().getDataResourceForIndex("test_index"), versions, 0, 1, 2, 3, 4, 5); + } + + @Test + public void testDeletesUnneededVersions() throws IOException { + TestServer server = getTestServer(); + S3Backend s3Backend = new S3Backend(TEST_BUCKET, false, getS3()); + List versions = initIndex(server, s3Backend); + CommandLine cmd = getInjectedCommand(); + + int exitCode = + cmd.execute( + "--serviceName=" + SERVICE_NAME, + "--indexName=test_index", + "--bucketName=" + TEST_BUCKET, + "--deleteAfter=1s", + "--gracePeriod=1s"); + assertEquals(0, exitCode); + + assertVersions(server.getGlobalState().getDataResourceForIndex("test_index"), versions, 5); + } + + @Test + public void testKeepsFutureVersions() throws IOException { + TestServer server = getTestServer(); + S3Backend s3Backend = new S3Backend(TEST_BUCKET, false, getS3()); + List versions = initIndex(server, s3Backend); + CommandLine cmd = getInjectedCommand(); + + setCurrentVersion(s3Backend, server, versions.get(3)); + + int exitCode = + cmd.execute( + "--serviceName=" + SERVICE_NAME, + "--indexName=test_index", + "--bucketName=" + TEST_BUCKET, + "--deleteAfter=1s", + "--gracePeriod=1s"); + assertEquals(0, exitCode); + + assertVersions( + server.getGlobalState().getDataResourceForIndex("test_index"), versions, 3, 4, 5); + } + + @Test + public void testGracePeriod() throws IOException { + TestServer server = getTestServer(); + S3Backend s3Backend = new S3Backend(TEST_BUCKET, false, getS3()); + List versions = initIndex(server, s3Backend); + CommandLine cmd = getInjectedCommand(); + AmazonS3 s3Client = getS3(); + String indexResource = server.getGlobalState().getDataResourceForIndex("test_index"); + + Set initialIndexFiles = getExistingDataFiles(s3Client, indexResource); + int exitCode = + cmd.execute( + "--serviceName=" + SERVICE_NAME, + "--indexName=test_index", + "--bucketName=" + TEST_BUCKET, + "--deleteAfter=1s", + "--gracePeriod=1h"); + assertEquals(0, exitCode); + + Set cleanedUpFiles = getExistingDataFiles(s3Client, indexResource); + assertEquals(initialIndexFiles, cleanedUpFiles); + assertEquals(Set.of(versions.get(5)), getExistingVersions(s3Client, indexResource)); + + exitCode = + cmd.execute( + "--serviceName=" + SERVICE_NAME, + "--indexName=test_index", + "--bucketName=" + TEST_BUCKET, + "--deleteAfter=1s", + "--gracePeriod=1s"); + assertEquals(0, exitCode); + assertVersions(server.getGlobalState().getDataResourceForIndex("test_index"), versions, 5); + } + + private void assertVersions(String indexResource, List versionNames, int... versions) + throws IOException { + AmazonS3 s3Client = getS3(); + Set expectedVersions = new HashSet<>(); + for (int i : versions) { + expectedVersions.add(versionNames.get(i)); + } + Set presentVersions = getExistingVersions(s3Client, indexResource); + assertEquals(expectedVersions, presentVersions); + + String versionPrefix = + S3Backend.getIndexResourcePrefix( + SERVICE_NAME, indexResource, RemoteBackend.IndexResourceType.POINT_STATE); + Set requiredIndexFiles = new HashSet<>(); + for (int version : versions) { + String versionId = versionNames.get(version); + byte[] versionData = + s3Client + .getObject(TEST_BUCKET, versionPrefix + versionId) + .getObjectContent() + .readAllBytes(); + NrtPointState pointState = RemoteUtils.pointStateFromUtf8(versionData); + Set versionFiles = + pointState.files.entrySet().stream() + .map(e -> S3Backend.getIndexBackendFileName(e.getKey(), e.getValue())) + .collect(Collectors.toSet()); + requiredIndexFiles.addAll(versionFiles); + } + + Set presentIndexFiles = getExistingDataFiles(s3Client, indexResource); + assertEquals(requiredIndexFiles, presentIndexFiles); + } + + private Set getExistingVersions(AmazonS3 s3Client, String indexResource) { + Set versions = new HashSet<>(); + boolean currentVersionFileSeen = false; + String versionPrefix = + S3Backend.getIndexResourcePrefix( + SERVICE_NAME, indexResource, RemoteBackend.IndexResourceType.POINT_STATE); + ListObjectsV2Result result = s3Client.listObjectsV2(TEST_BUCKET, versionPrefix); + for (S3ObjectSummary summary : result.getObjectSummaries()) { + String baseName = summary.getKey().split(versionPrefix)[1]; + if (S3Backend.CURRENT_VERSION.equals(baseName)) { + currentVersionFileSeen = true; + } else { + versions.add(baseName); + } + } + assertTrue(currentVersionFileSeen); + return versions; + } + + private Set getExistingDataFiles(AmazonS3 s3Client, String indexResource) { + Set indexFiles = new HashSet<>(); + String indexDataPrefix = S3Backend.getIndexDataPrefix(SERVICE_NAME, indexResource); + ListObjectsV2Result result = s3Client.listObjectsV2(TEST_BUCKET, indexDataPrefix); + for (S3ObjectSummary summary : result.getObjectSummaries()) { + String baseName = summary.getKey().split(indexDataPrefix)[1]; + indexFiles.add(baseName); + } + return indexFiles; + } + + @Test + public void testValidatePSNameAndGetTimestampMs() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long expectedTimeMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + String testPSName = timeString + "-" + UUID.randomUUID() + "-1"; + long actualTimeMs = CleanupDataCommand.validatePSNameAndGetTimestampMs(testPSName); + assertEquals(expectedTimeMs, actualTimeMs); + } + + @Test + public void testValidatePSNameAndGetTimestampMs_invalidStructure() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-2-1"; + try { + CleanupDataCommand.validatePSNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid point state name: " + testPSName)); + } + } + + @Test + public void testValidatePSNameAndGetTimestampMs_invalidTimeString() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "000-" + UUID.randomUUID() + "-1"; + try { + CleanupDataCommand.validatePSNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid point state name: " + testPSName)); + } + } + + @Test + public void testValidatePSNameAndGetTimestampMs_invalidUUID() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-" + "5d65b454-fa30-49-invalid-8e0e5" + "-1"; + try { + CleanupDataCommand.validatePSNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid point state name: " + testPSName)); + } + } + + @Test + public void testValidatePSNameAndGetTimestampMs_invalidVersionNumber() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-" + UUID.randomUUID() + "-invalid"; + try { + CleanupDataCommand.validatePSNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid point state name: " + testPSName)); + } + } + + @Test + public void testValidateDataNameAndGetTimestampMs() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long expectedTimeMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + String testPSName = timeString + "-" + UUID.randomUUID() + "-_indexFile"; + long actualTimeMs = CleanupDataCommand.validateDataNameAndGetTimestampMs(testPSName); + assertEquals(expectedTimeMs, actualTimeMs); + } + + @Test + public void testValidateDataNameAndGetTimestampMs_invalidStructure() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-2-1"; + try { + CleanupDataCommand.validateDataNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid data name: " + testPSName)); + } + } + + @Test + public void testValidateDataNameAndGetTimestampMs_invalidTimeString() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "000-" + UUID.randomUUID() + "-_indexFile"; + try { + CleanupDataCommand.validateDataNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid data name: " + testPSName)); + } + } + + @Test + public void testValidateDataNameAndGetTimestampMs_invalidUUID() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-" + "5d65b454-fa30-49-invalid-8e0e5" + "-_indexFile"; + try { + CleanupDataCommand.validateDataNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid data name: " + testPSName)); + } + } + + @Test + public void testValidateDataNameAndGetTimestampMs_invalidIndexFile() { + String timeString = TimeStringUtil.generateTimeStringSec(); + String testPSName = timeString + "-" + UUID.randomUUID() + "-invalid"; + try { + CleanupDataCommand.validateDataNameAndGetTimestampMs(testPSName); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid data name: " + testPSName)); + } + } + + @Test + public void testPointStateDeletionDecider_delete() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.PointStateDeletionDecider decider = + new CleanupDataCommand.PointStateDeletionDecider(timestampMs + 1000); + String fileName = timeString + "-" + UUID.randomUUID() + "-1"; + assertTrue(decider.shouldDelete(fileName, timestampMs)); + assertFalse(decider.isDone()); + assertNull(decider.getOldestRetainedFile()); + } + + @Test + public void testPointStateDeletionDecider_retainGreaterTimeString() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.PointStateDeletionDecider decider = + new CleanupDataCommand.PointStateDeletionDecider(timestampMs - 1000); + String fileName = timeString + "-" + UUID.randomUUID() + "-1"; + assertFalse(decider.shouldDelete(fileName, timestampMs - 2000)); + assertTrue(decider.isDone()); + assertEquals(fileName, decider.getOldestRetainedFile()); + } + + @Test + public void testPointStateDeletionDecider_retainGreaterTimestamp() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.PointStateDeletionDecider decider = + new CleanupDataCommand.PointStateDeletionDecider(timestampMs + 1000); + String fileName = timeString + "-" + UUID.randomUUID() + "-1"; + assertFalse(decider.shouldDelete(fileName, timestampMs + 2000)); + assertFalse(decider.isDone()); + assertNull(decider.getOldestRetainedFile()); + } + + @Test + public void testPointStateDeletionDecider_retainCurrentVersion() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.PointStateDeletionDecider decider = + new CleanupDataCommand.PointStateDeletionDecider(timestampMs); + assertFalse(decider.shouldDelete(S3Backend.CURRENT_VERSION, 0)); + assertFalse(decider.isDone()); + assertNull(decider.getOldestRetainedFile()); + } + + @Test + public void testPointStateDeletionDecider_invalidName() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.PointStateDeletionDecider decider = + new CleanupDataCommand.PointStateDeletionDecider(timestampMs + 1000); + String fileName = timeString + "-" + UUID.randomUUID() + "-invalid"; + try { + decider.shouldDelete(fileName, timestampMs); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid point state name: " + fileName)); + } + } + + @Test + public void testIndexDataDeletionDecider_delete() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.IndexDataDeletionDecider decider = + new CleanupDataCommand.IndexDataDeletionDecider(timestampMs + 1000, Set.of()); + String fileName = timeString + "-" + UUID.randomUUID() + "-_indexFile"; + assertTrue(decider.shouldDelete(fileName, timestampMs)); + assertFalse(decider.isDone()); + } + + @Test + public void testIndexDataDeletionDecider_retainActiveFiles() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + String fileName = timeString + "-" + UUID.randomUUID() + "-_indexFile"; + CleanupDataCommand.IndexDataDeletionDecider decider = + new CleanupDataCommand.IndexDataDeletionDecider(timestampMs + 1000, Set.of(fileName)); + assertFalse(decider.shouldDelete(fileName, timestampMs)); + assertFalse(decider.isDone()); + } + + @Test + public void testIndexDataDeletionDecider_retainGreaterTimeString() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.IndexDataDeletionDecider decider = + new CleanupDataCommand.IndexDataDeletionDecider(timestampMs - 1000, Set.of()); + String fileName = timeString + "-" + UUID.randomUUID() + "-_indexFile"; + assertFalse(decider.shouldDelete(fileName, timestampMs - 2000)); + assertTrue(decider.isDone()); + } + + @Test + public void testIndexDataDeletionDecider_retainGreaterTimestamp() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.IndexDataDeletionDecider decider = + new CleanupDataCommand.IndexDataDeletionDecider(timestampMs + 1000, Set.of()); + String fileName = timeString + "-" + UUID.randomUUID() + "-_indexFile"; + assertFalse(decider.shouldDelete(fileName, timestampMs + 2000)); + assertFalse(decider.isDone()); + } + + @Test + public void testIndexDataDeletionDecider_invalidName() { + String timeString = TimeStringUtil.generateTimeStringSec(); + long timestampMs = TimeStringUtil.parseTimeStringSec(timeString).toEpochMilli(); + + CleanupDataCommand.IndexDataDeletionDecider decider = + new CleanupDataCommand.IndexDataDeletionDecider(timestampMs + 1000, Set.of()); + String fileName = timeString + "-" + UUID.randomUUID() + "-invalid"; + try { + decider.shouldDelete(fileName, timestampMs); + fail(); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Invalid data name: " + fileName)); + } + } +}