Skip to content

Commit

Permalink
Improve delete unused tags to be able to process in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
zdong2 committed Sep 6, 2024
1 parent f4b464b commit 22788da
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ abstract class GenieClientIntegrationTestBase {
// leverage their layered jars to produce less changing images.
@Container
private static final GenericContainer GENIE = new GenericContainer("netflixoss/genie-app:latest.release")
.waitingFor(Wait.forHttp("/admin/health").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(1L)))
.waitingFor(Wait.forHttp("/admin/health").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(5L)))
.withExposedPorts(8080);

protected ApplicationClient applicationClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void canDeleteUnusedTags() throws GenieCheckedException {
Assertions.assertThat(this.tagRepository.existsByTag(tag1)).isTrue();
Assertions.assertThat(this.tagRepository.existsByTag(tag2)).isTrue();

Assertions.assertThat(this.service.deleteUnusedTags(Instant.now(), 10)).isEqualTo(1L);
Assertions.assertThat(this.service.deleteUnusedTags(Instant.EPOCH, Instant.now(), 10)).isEqualTo(1L);

Assertions.assertThat(this.tagRepository.existsByTag(tag1)).isFalse();
Assertions.assertThat(this.tagRepository.existsByTag(tag2)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,22 +1121,24 @@ <R extends CommonResource> void removeTagForResource(
//region Tag APIs

/**
* Delete all tags from the database that aren't referenced which were created before the supplied created
* threshold.
* Delete all tags from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThreshold The instant in time where tags created before this time that aren't referenced
* will be deleted. Inclusive
* @param batchSize The maximum number of tags to delete in a single transaction
* @param createdThresholdLowerBound The instant in time when tags created after this time that aren't referenced
* will be selected. Inclusive.
* @param createdThresholdUpperBound The instant in time when tags created before this time that aren't referenced
* will be selected. Inclusive.
* @param batchSize The maximum number of tags to delete in a single transaction
* @return The number of tags deleted
*/
long deleteUnusedTags(@NotNull Instant createdThreshold, @Min(1) int batchSize);
long deleteUnusedTags(@NotNull Instant createdThresholdLowerBound,
@NotNull Instant createdThresholdUpperBound,
@Min(1) int batchSize);
//endregion

//region File APIs

/**
* Delete all files from the database that aren't referenced which were created before the supplied created
* threshold.
* Delete all files from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThresholdLowerBound The instant in time when files created after this time that aren't referenced
* will be selected. Inclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,11 +2301,17 @@ public <R extends CommonResource> void removeTagForResource(
*/
@Override
@Transactional(isolation = Isolation.READ_COMMITTED)
public long deleteUnusedTags(@NotNull final Instant createdThreshold, @Min(1) final int batchSize) {
log.info("[deleteUnusedTags] Called to delete unused tags created before {}", createdThreshold);
public long deleteUnusedTags(@NotNull final Instant createdThresholdLowerBound,
@NotNull final Instant createdThresholdUpperBound,
@Min(1) final int batchSize) {
log.debug(
"[deleteUnusedTags] Called to delete unused tags created between {} and {}",
createdThresholdLowerBound,
createdThresholdUpperBound
);
return this.tagRepository.deleteByIdIn(
this.tagRepository
.findUnusedTags(createdThreshold, batchSize)
.findUnusedTags(createdThresholdLowerBound, createdThresholdUpperBound, batchSize)
.stream()
.map(Number::longValue)
.collect(Collectors.toSet())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public interface JpaFileRepository extends JpaIdRepository<FileEntity> {
Set<FileEntity> findByFileIn(Set<String> files);

/**
* Find the ids of all files from the database that aren't referenced which were created before the supplied created
* threshold.
* Find the ids of all files from the database that aren't referenced which were created between the supplied
* thresholds.
*
* @param createdThresholdLowerBound The instant in time when files created after this time that aren't referenced
* will be selected. Inclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public interface JpaTagRepository extends JpaIdRepository<TagEntity> {
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM commands_tags) "
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM criteria_tags) "
+ "AND id NOT IN (SELECT DISTINCT(tag_id) FROM jobs_tags) "
+ "AND created <= :createdThreshold "
+ "AND created <= :createdThresholdUpperBound "
+ "AND created >= :createdThresholdLowerBound "
+ "LIMIT :limit "
+ "FOR UPDATE;";

Expand Down Expand Up @@ -74,17 +75,19 @@ public interface JpaTagRepository extends JpaIdRepository<TagEntity> {
Set<TagEntity> findByTagIn(Set<String> tags);

/**
* Find all tags from the database that aren't referenced which were created before the supplied created
* threshold.
* Find all tags from the database that aren't referenced which were created between the supplied thresholds.
*
* @param createdThreshold The instant in time where tags created before this time that aren't referenced
* will be returned. Inclusive
* @param limit Maximum number of IDs to return
* @param createdThresholdLowerBound The instant in time when tags created after this time that aren't referenced
* will be selected. Inclusive.
* @param createdThresholdUpperBound The instant in time when tags created before this time that aren't referenced
* will be selected. Inclusive.
* @param limit Maximum number of IDs to return
* @return The number of tags deleted
*/
@Query(value = SELECT_FOR_UPDATE_UNUSED_TAGS_SQL, nativeQuery = true)
Set<Number> findUnusedTags(
@Param("createdThreshold") Instant createdThreshold,
@Param("createdThresholdLowerBound") Instant createdThresholdLowerBound,
@Param("createdThresholdUpperBound") Instant createdThresholdUpperBound,
@Param("limit") int limit
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,31 @@ public static class TagDatabaseCleanupProperties {
*/
public static final String SKIP_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".skip";

/**
* The number of days within current day that the unused tags deletion will be running in batch mode.
*/
public static final String BATCH_DAYS_WITHIN_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".batchDaysWithin";

/**
* The size of the rolling window used to delete unused tags, units in hours.
*/
public static final String ROLLING_WINDOW_HOURS_PROPERTY = TAG_CLEANUP_PROPERTY_PREFIX + ".rollingWindowHours";

/**
* Skip the Tags table when performing database cleanup.
*/
private boolean skip;

/**
* The number of days within current day that the unused tags deletion will be running in batch mode.
*/
@Min(1)
private int batchDaysWithin = 30;

/**
* The size of the rolling window used to delete unused tags, units in hours.
*/
@Min(1)
private int rollingWindowHours = 12;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private void deleteFiles(final Instant creationThreshold) {
upperBound = lowerBound;
lowerBound = lowerBound.minus(rollingWindowHours, ChronoUnit.HOURS);
}
// do a final deletion of everything < batchLowerBound
// do a final deletion of everything <= batchLowerBound
totalDeleted += deleteUnusedFilesBetween(Instant.EPOCH, upperBound, batchSize);
log.info(
"Deleted {} files that were unused by any resource and created before {}",
Expand Down Expand Up @@ -411,19 +411,33 @@ private void deleteTags(final Instant creationThreshold) {
Integer.class,
this.cleanupProperties.getBatchSize()
);

final long rollingWindowHours = this.environment.getProperty(
DatabaseCleanupProperties.TagDatabaseCleanupProperties.ROLLING_WINDOW_HOURS_PROPERTY,
Integer.class,
this.cleanupProperties.getTagCleanup().getRollingWindowHours()
);
final long batchDaysWithin = this.environment.getProperty(
DatabaseCleanupProperties.TagDatabaseCleanupProperties.BATCH_DAYS_WITHIN_PROPERTY,
Integer.class,
this.cleanupProperties.getTagCleanup().getBatchDaysWithin()
);
log.info(
"Attempting to delete unused tags from before {} in batches of {}",
creationThreshold,
batchSize
);

long deleted;
long totalDeleted = 0L;
do {
deleted = this.persistenceService.deleteUnusedTags(creationThreshold, batchSize);
totalDeleted += deleted;
} while (deleted > 0);
Instant upperBound = creationThreshold;
Instant lowerBound = creationThreshold.minus(rollingWindowHours, ChronoUnit.HOURS);
final Instant batchLowerBound = creationThreshold.minus(batchDaysWithin, ChronoUnit.DAYS);
while (upperBound.isAfter(batchLowerBound)) {
totalDeleted += deleteUnusedTagsBetween(lowerBound, upperBound, batchSize);
upperBound = lowerBound;
lowerBound = lowerBound.minus(rollingWindowHours, ChronoUnit.HOURS);
}
// do a final deletion of everything <= batchLowerBound
totalDeleted += deleteUnusedTagsBetween(Instant.EPOCH, upperBound, batchSize);
log.info(
"Deleted {} tags that were unused by any resource and created before {}",
totalDeleted,
Expand All @@ -441,6 +455,16 @@ private void deleteTags(final Instant creationThreshold) {
}
}

private long deleteUnusedTagsBetween(final Instant lowerBound, final Instant upperBound, final int batchSize) {
long deleted;
long totalDeleted = 0L;
do {
deleted = this.persistenceService.deleteUnusedTags(lowerBound, upperBound, batchSize);
totalDeleted += deleted;
} while (deleted > 0);
return totalDeleted;
}

private void deactivateCommands(final Instant runtime) {
final long startTime = System.nanoTime();
final Set<Tag> tags = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void canGetDefaultValues() {
Assertions.assertThat(this.properties.getJobCleanup().getPageSize()).isEqualTo(1000);
Assertions.assertThat(this.properties.getClusterCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getTagCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getTagCleanup().getBatchDaysWithin()).isEqualTo(30);
Assertions.assertThat(this.properties.getTagCleanup().getRollingWindowHours()).isEqualTo(12);
Assertions.assertThat(this.properties.getFileCleanup().isSkip()).isFalse();
Assertions.assertThat(this.properties.getFileCleanup().getBatchDaysWithin()).isEqualTo(30);
Assertions.assertThat(this.properties.getFileCleanup().getRollingWindowHours()).isEqualTo(12);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ void canRun() {
Mockito.when(this.cleanupProperties.getBatchSize()).thenReturn(batchSize);
Mockito.when(this.fileCleanupProperties.getBatchDaysWithin()).thenReturn(batchDaysWithin);
Mockito.when(this.fileCleanupProperties.getRollingWindowHours()).thenReturn(rollingWindow);
Mockito.when(this.tagCleanupProperties.getBatchDaysWithin()).thenReturn(batchDaysWithin);
Mockito.when(this.tagCleanupProperties.getRollingWindowHours()).thenReturn(rollingWindow);
Mockito.when(this.jobCleanupProperties.getRetention()).thenReturn(days).thenReturn(negativeDays);
Mockito.when(this.jobCleanupProperties.getPageSize()).thenReturn(pageSize);
Mockito.when(this.commandDeactivationProperties.getCommandCreationThreshold()).thenReturn(60);
Expand Down Expand Up @@ -169,7 +171,8 @@ void canRun() {
Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize)))
.thenReturn(3L, 0L, 4L, 0L);
Mockito
.when(this.persistenceService.deleteUnusedTags(Mockito.any(Instant.class), Mockito.eq(batchSize)))
.when(this.persistenceService.deleteUnusedTags(
Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize)))
.thenReturn(5L, 0L, 6L, 0L);
Mockito
.when(this.persistenceService.deleteUnusedApplications(Mockito.any(Instant.class), Mockito.eq(batchSize)))
Expand Down Expand Up @@ -225,8 +228,8 @@ void canRun() {
.verify(this.persistenceService, Mockito.times(16))
.deleteUnusedFiles(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize));
Mockito
.verify(this.persistenceService, Mockito.times(4))
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.eq(batchSize));
.verify(this.persistenceService, Mockito.times(16))
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.eq(batchSize));
Mockito
.verify(this.persistenceService, Mockito.times(5))
.deleteUnusedApplications(Mockito.any(Instant.class), Mockito.eq(batchSize));
Expand Down Expand Up @@ -332,6 +335,6 @@ void skipAll() {
.deleteUnusedFiles(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.anyInt());
Mockito
.verify(this.persistenceService, Mockito.never())
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.anyInt());
.deleteUnusedTags(Mockito.any(Instant.class), Mockito.any(Instant.class), Mockito.anyInt());
}
}

0 comments on commit 22788da

Please sign in to comment.