Skip to content

Commit

Permalink
Improve delete unused tags to be able to process in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
zdong2 committed Sep 17, 2024
1 parent f4b464b commit f68f166
Showing 10 changed files with 105 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ abstract class GenieClientIntegrationTestBase {
// leverage their layered jars to produce less changing images.
@Container
private static final GenericContainer GENIE = new GenericContainer("netflixoss/genie-app:latest.release")
.waitingFor(Wait.forHttp("/admin/health").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(1L)))
.waitingFor(Wait.forHttp("/admin/health").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(5L)))
.withExposedPorts(8080);

protected ApplicationClient applicationClient;
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ void canDeleteUnusedTags() throws GenieCheckedException {
Assertions.assertThat(this.tagRepository.existsByTag(tag1)).isTrue();
Assertions.assertThat(this.tagRepository.existsByTag(tag2)).isTrue();

Assertions.assertThat(this.service.deleteUnusedTags(Instant.now(), 10)).isEqualTo(1L);
Assertions.assertThat(this.service.deleteUnusedTags(Instant.EPOCH, Instant.now(), 10)).isEqualTo(1L);

Assertions.assertThat(this.tagRepository.existsByTag(tag1)).isFalse();
Assertions.assertThat(this.tagRepository.existsByTag(tag2)).isTrue();
Original file line number Diff line number Diff line change
@@ -1121,22 +1121,24 @@ <R extends CommonResource> void removeTagForResource(
//region Tag APIs

/**
* Delete all tags from the database that aren't referenced which were created before the supplied created
* threshold.
* Delete all tags from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThreshold The instant in time where tags created before this time that aren't referenced
* will be deleted. Inclusive
* @param batchSize The maximum number of tags to delete in a single transaction
* @param createdThresholdLowerBound The instant in time when tags created after this time that aren't referenced
* will be selected. Inclusive.
* @param createdThresholdUpperBound The instant in time when tags created before this time that aren't referenced
* will be selected. Inclusive.
* @param batchSize The maximum number of tags to delete in a single transaction
* @return The number of tags deleted
*/
long deleteUnusedTags(@NotNull Instant createdThreshold, @Min(1) int batchSize);
long deleteUnusedTags(@NotNull Instant createdThresholdLowerBound,
@NotNull Instant createdThresholdUpperBound,
@Min(1) int batchSize);
//endregion

//region File APIs

/**
* Delete all files from the database that aren't referenced which were created before the supplied created
* threshold.
* Delete all files from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThresholdLowerBound The instant in time when files created after this time that aren't referenced
* will be selected. Inclusive.
Original file line number Diff line number Diff line change
@@ -2301,11 +2301,17 @@ public <R extends CommonResource> void removeTagForResource(
*/
@Override
@Transactional(isolation = Isolation.READ_COMMITTED)
public long deleteUnusedTags(@NotNull final Instant createdThreshold, @Min(1) final int batchSize) {
log.info("[deleteUnusedTags] Called to delete unused tags created before {}", createdThreshold);
public long deleteUnusedTags(@NotNull final Instant createdThresholdLowerBound,
@NotNull final Instant createdThresholdUpperBound,
@Min(1) final int batchSize) {
log.debug(
"[deleteUnusedTags] Called to delete unused tags created between {} and {}",
createdThresholdLowerBound,
createdThresholdUpperBound
);
return this.tagRepository.deleteByIdIn(
this.tagRepository
.findUnusedTags(createdThreshold, batchSize)
.findUnusedTags(createdThresholdLowerBound, createdThresholdUpperBound, batchSize)
.stream()
.map(Number::longValue)
.collect(Collectors.toSet())
Original file line number Diff line number Diff line change
@@ -82,8 +82,8 @@ public interface JpaFileRepository extends JpaIdRepository<FileEntity> {
Set<FileEntity> findByFileIn(Set<String> files);

/**
* Find the ids of all files from the database that aren't referenced which were created before the supplied created
* threshold.
* Find the ids of all files from the database that aren't referenced which were created between the supplied
* thresholds.
*
* @param createdThresholdLowerBound The instant in time when files created after this time that aren't referenced
* will be selected. Inclusive.
Original file line number Diff line number Diff line change
@@ -45,7 +45,8 @@ public interface JpaTagRepository extends JpaIdRepository<TagEntity> {
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM commands_tags) "
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM criteria_tags) "
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM jobs_tags) "
+ "AND created <= :createdThreshold "
+ "AND created <= :createdThresholdUpperBound "
+ "AND created >= :createdThresholdLowerBound "
+ "LIMIT :limit "
+ "FOR UPDATE;";

@@ -74,17 +75,19 @@ public interface JpaTagRepository extends JpaIdRepository<TagEntity> {
Set<TagEntity> findByTagIn(Set<String> tags);

/**
* Find all tags from the database that aren't referenced which were created before the supplied created
* threshold.
* Find all tags from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThreshold The instant in time where tags created before this time that aren't referenced
* will be returned. Inclusive
* @param limit Maximum number of IDs to return
* @param createdThresholdLowerBound The instant in time when tags created after this time that aren't referenced
* will be selected. Inclusive.
* @param createdThresholdUpperBound The instant in time when tags created before this time that aren't referenced
* will be selected. Inclusive.
* @param limit Maximum number of IDs to return
* @return The number of tags deleted
*/
@Query(value = SELECT_FOR_UPDATE_UNUSED_TAGS_SQL, nativeQuery = true)
Set<Number> findUnusedTags(
@Param("createdThreshold") Instant createdThreshold,
@Param("createdThresholdLowerBound") Instant createdThresholdLowerBound,
@Param("createdThresholdUpperBound") Instant createdThresholdUpperBound,
@Param("limit") int limit
);

Original file line number Diff line number Diff line change
@@ -369,9 +369,31 @@ public static class TagDatabaseCleanupProperties {
*/
public static final String SKIP_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".skip";

/**
* The number of days within current day that the unused tags deletion will be running in batch mode.
*/
public static final String BATCH_DAYS_WITHIN_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".batchDaysWithin";

/**
* The size of the rolling window used to delete unused tags, units in hours.
*/
public static final String ROLLING_WINDOW_HOURS_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".rollingWindowHours";

/**
* Skip the Tags table when performing database cleanup.
*/
private boolean skip;

/**
* The number of days within current day that the unused tags deletion will be running in batch mode.
*/
@Min(1)
private int batchDaysWithin = 30;

/**
* The size of the rolling window used to delete unused tags, units in hours.
*/
@Min(1)
private int rollingWindowHours = 12;
}
}
Original file line number Diff line number Diff line change
@@ -350,9 +350,12 @@ private void deleteFiles(final Instant creationThreshold) {
this.cleanupProperties.getFileCleanup().getBatchDaysWithin()
);
log.info(
"Attempting to delete unused files from before {} in batches of {}",
"Attempting to delete unused files older than {} in batches of {}. Using a rolling window"
+ " of {} hours within last {} days",
creationThreshold,
batchSize
batchSize,
rollingWindowHours,
batchDaysWithin
);

long totalDeleted = 0L;
@@ -364,7 +367,7 @@ private void deleteFiles(final Instant creationThreshold) {
upperBound = lowerBound;
lowerBound = lowerBound.minus(rollingWindowHours, ChronoUnit.HOURS);
}
// do a final deletion of everything < batchLowerBound
// do a final deletion of everything <= batchLowerBound
totalDeleted += deleteUnusedFilesBetween(Instant.EPOCH, upperBound, batchSize);
log.info(
"Deleted {} files that were unused by any resource and created before {}",
@@ -411,19 +414,36 @@ private void deleteTags(final Instant creationThreshold) {
Integer.class,
this.cleanupProperties.getBatchSize()
);

final long rollingWindowHours = this.environment.getProperty(
DatabaseCleanupProperties.TagDatabaseCleanupProperties.ROLLING_WINDOW_HOURS_PROPERTY,
Integer.class,
this.cleanupProperties.getTagCleanup().getRollingWindowHours()
);
final long batchDaysWithin = this.environment.getProperty(
DatabaseCleanupProperties.TagDatabaseCleanupProperties.BATCH_DAYS_WITHIN_PROPERTY,
Integer.class,
this.cleanupProperties.getTagCleanup().getBatchDaysWithin()
);
log.info(
"Attempting to delete unused tags from before {} in batches of {}",
"Attempting to delete unused tags older than {} in batches of {}. Using a rolling window"
+ " of {} hours within last {} days",
creationThreshold,
batchSize
batchSize,
rollingWindowHours,
batchDaysWithin
);

long deleted;
long totalDeleted = 0L;
do {
deleted = this.persistenceService.deleteUnusedTags(creationThreshold, batchSize);
totalDeleted += deleted;
} while (deleted > 0);
Instant upperBound = creationThreshold;
Instant lowerBound = creationThreshold.minus(rollingWindowHours, ChronoUnit.HOURS);
final Instant batchLowerBound = creationThreshold.minus(batchDaysWithin, ChronoUnit.DAYS);
while (upperBound.isAfter(batchLowerBound)) {
totalDeleted += deleteUnusedTagsBetween(lowerBound, upperBound, batchSize);
upperBound = lowerBound;
lowerBound = lowerBound.minus(rollingWindowHours, ChronoUnit.HOURS);
}
// do a final deletion of everything <= batchLowerBound
totalDeleted += deleteUnusedTagsBetween(Instant.EPOCH, upperBound, batchSize);
log.info(
"Deleted {} tags that were unused by any resource and created before {}",
totalDeleted,
@@ -441,6 +461,16 @@ private void deleteTags(final Instant creationThreshold) {
}
}

private long deleteUnusedTagsBetween(final Instant lowerBound, final Instant upperBound, final int batchSize) {
long deleted;
long totalDeleted = 0L;
do {
deleted = this.persistenceService.deleteUnusedTags(lowerBound, upperBound, batchSize);
totalDeleted += deleted;
} while (deleted > 0);
return totalDeleted;
}

private void deactivateCommands(final Instant runtime) {
final long startTime = System.nanoTime();
final Set<Tag> tags = Sets.newHashSet();
Original file line number Diff line number Diff line change
@@ -53,6 +53,8 @@ void canGetDefaultValues() {
Assertions.assertThat(this.properties.getJobCleanup().getPageSize()).isEqualTo(1000);
Assertions.assertThat(this.properties.getClusterCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getTagCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getTagCleanup().getBatchDaysWithin()).isEqualTo(30);
Assertions.assertThat(this.properties.getTagCleanup().getRollingWindowHours()).isEqualTo(12);
Assertions.assertThat(this.properties.getFileCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getFileCleanup().getBatchDaysWithin()).isEqualTo(30);
Assertions.assertThat(this.properties.getFileCleanup().getRollingWindowHours()).isEqualTo(12);
Original file line number Diff line number Diff line change
@@ -133,6 +133,8 @@ void canRun() {
Mockito.when(this.cleanupProperties.getBatchSize()).thenReturn(batchSize);
Mockito.when(this.fileCleanupProperties.getBatchDaysWithin()).thenReturn(batchDaysWithin);
Mockito.when(this.fileCleanupProperties.getRollingWindowHours()).thenReturn(rollingWindow);
Mockito.when(this.tagCleanupProperties.getBatchDaysWithin()).thenReturn(batchDaysWithin);
Mockito.when(this.tagCleanupProperties.getRollingWindowHours()).thenReturn(rollingWindow);
Mockito.when(this.jobCleanupProperties.getRetention()).thenReturn(days).thenReturn(negativeDays);
Mockito.when(this.jobCleanupProperties.getPageSize()).thenReturn(pageSize);
Mockito.when(this.commandDeactivationProperties.getCommandCreationThreshold()).thenReturn(60);
@@ -169,7 +171,8 @@ void canRun() {
Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize)))
.thenReturn(3L, 0L, 4L, 0L);
Mockito
.when(this.persistenceService.deleteUnusedTags(Mockito.any(Instant.class), Mockito.eq(batchSize)))
.when(this.persistenceService.deleteUnusedTags(
Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize)))
.thenReturn(5L, 0L, 6L, 0L);
Mockito
.when(this.persistenceService.deleteUnusedApplications(Mockito.any(Instant.class), Mockito.eq(batchSize)))
@@ -225,8 +228,8 @@ void canRun() {
.verify(this.persistenceService, Mockito.times(16))
.deleteUnusedFiles(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize));
Mockito
.verify(this.persistenceService, Mockito.times(4))
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.eq(batchSize));
.verify(this.persistenceService, Mockito.times(16))
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize));
Mockito
.verify(this.persistenceService, Mockito.times(5))
.deleteUnusedApplications(Mockito.any(Instant.class), Mockito.eq(batchSize));
@@ -332,6 +335,6 @@ void skipAll() {
.deleteUnusedFiles(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.anyInt());
Mockito
.verify(this.persistenceService, Mockito.never())
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.anyInt());
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.anyInt());
}
}

0 comments on commit f68f166

Please sign in to comment.