diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java index 24b6e1f6af40..5c1b2ff16fcf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java @@ -28,7 +28,6 @@ import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionSlotManager; import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator; -import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.timeline.CompactionState; @@ -133,9 +132,7 @@ DataSourceCompactibleSegmentIterator getCompactibleCandidates( config, timeline, Intervals.complementOf(searchInterval), - // This policy is used only while creating jobs - // The actual order of jobs is determined by the policy used in CompactionJobQueue - new NewestSegmentFirstPolicy(null), + params.getClusterCompactionConfig().getCompactionPolicy(), params.getFingerprintMapper() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java index 9446ac664f29..53697b0ae9c1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java @@ -25,6 +25,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -43,7 +44,6 @@ import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.CompactionSlotManager; import org.apache.druid.server.compaction.CompactionSnapshotBuilder; -import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.compaction.CompactionStatusTracker; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; @@ -217,7 +217,7 @@ public void runReadyJobs() final List pendingJobs = new ArrayList<>(); while (!queue.isEmpty()) { final CompactionJob job = queue.poll(); - if (startJobIfPendingAndReady(job, searchPolicy, pendingJobs, slotManager)) { + if (startJobIfPendingAndReady(job, pendingJobs, slotManager)) { runStats.add(Stats.Compaction.SUBMITTED_TASKS, RowKey.of(Dimension.DATASOURCE, job.getDataSource()), 1); } } @@ -267,7 +267,6 @@ public Map getSnapshots() */ private boolean startJobIfPendingAndReady( CompactionJob job, - CompactionCandidateSearchPolicy policy, List pendingJobs, CompactionSlotManager slotManager ) @@ -282,18 +281,17 @@ private boolean startJobIfPendingAndReady( } // Check if the job is already running, completed or skipped - final CompactionStatus compactionStatus = getCurrentStatusForJob(job, policy); - switch (compactionStatus.getState()) { - case RUNNING: + final CompactionCandidate.TaskState candidateState = getCurrentTaskStateForJob(job); + switch (candidateState) { + case TASK_IN_PROGRESS: return false; - case COMPLETE: + case RECENTLY_COMPLETED: snapshotBuilder.moveFromPendingToCompleted(candidate); return false; - case SKIPPED: - snapshotBuilder.moveFromPendingToSkipped(candidate); - return false; - default: + case READY: break; + default: + throw DruidException.defensive("unknown compaction candidate state[%s]", candidateState); } // Check if enough compaction task slots are available @@ -378,12 +376,10 @@ private void persistPendingIndexingState(CompactionJob job) } } - public CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy) + public CompactionCandidate.TaskState getCurrentTaskStateForJob(CompactionJob job) { - final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy); - final CompactionCandidate candidatesWithStatus = job.getCandidate().withCurrentStatus(null); - statusTracker.onCompactionStatusComputed(candidatesWithStatus, null); - return compactionStatus; + statusTracker.onCompactionCandidates(job.getCandidate(), null); + return statusTracker.computeCompactionTaskState(job.getCandidate()); } public static CompactionConfigValidationResult validateCompactionJob(BatchIndexingJob job) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index 11709e616c71..6e06c962046a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -499,7 +499,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon updateRequest.getEngine() ); } else { - return new CompactionSimulateResult(Collections.emptyMap()); + return new CompactionSimulateResult(Collections.emptyMap(), null); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index 4c98b52f48c1..78389263b5fd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -66,9 +66,9 @@ import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.metadata.HeapMemoryIndexingStateStorage; import org.apache.druid.segment.metadata.IndexingStateCache; +import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatistics; -import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.compaction.CompactionStatusTracker; import org.apache.druid.server.compaction.Table; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -451,7 +451,7 @@ public void test_simulateRunWithConfigUpdate() new ClusterCompactionConfig(null, null, null, null, null, null) ); Assert.assertEquals(1, simulateResult.getCompactionStates().size()); - final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING); + final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionCandidate.TaskState.READY); Assert.assertEquals( Arrays.asList("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact"), pendingCompactionTable.getColumnNames() diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java index 2a9107132623..951f00ba788f 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java @@ -68,12 +68,12 @@ public final int compareCandidates(CompactionCandidate o1, CompactionCandidate o } @Override - public Eligibility checkEligibilityForCompaction( - CompactionCandidate candidate, - CompactionTaskStatus latestTaskStatus + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction proposedCompaction, + CompactionStatus eligibility ) { - return Eligibility.OK; + return CompactionMode.FULL_COMPACTION.createCandidate(proposedCompaction, eligibility); } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java index af8b32ebe6db..c2845fa46f7b 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java @@ -19,6 +19,7 @@ package org.apache.druid.server.compaction; +import com.google.common.base.Preconditions; import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -37,58 +39,182 @@ */ public class CompactionCandidate { - private final List segments; - private final Interval umbrellaInterval; - private final Interval compactionInterval; - private final String dataSource; - private final long totalBytes; - private final int numIntervals; - - private final CompactionStatus currentStatus; - - public static CompactionCandidate from( - List segments, - @Nullable Granularity targetSegmentGranularity - ) + /** + * Non-empty list of segments of a datasource being proposed for compaction. + * A proposed compaction typically contains all the segments of a single time chunk. + */ + public static class ProposedCompaction { - if (segments == null || segments.isEmpty()) { - throw InvalidInput.exception("Segments to compact must be non-empty"); + private final List segments; + private final Interval umbrellaInterval; + private final Interval compactionInterval; + private final String dataSource; + private final long totalBytes; + private final int numIntervals; + + public static ProposedCompaction from( + List segments, + @Nullable Granularity targetSegmentGranularity + ) + { + if (segments == null || segments.isEmpty()) { + throw InvalidInput.exception("Segments to compact must be non-empty"); + } + + final Set segmentIntervals = + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()); + final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); + final Interval compactionInterval = + targetSegmentGranularity == null + ? umbrellaInterval + : JodaUtils.umbrellaInterval(targetSegmentGranularity.getIterable(umbrellaInterval)); + + return new ProposedCompaction( + segments, + umbrellaInterval, + compactionInterval, + segmentIntervals.size() + ); } - final Set segmentIntervals = - segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()); - final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); - final Interval compactionInterval = - targetSegmentGranularity == null - ? umbrellaInterval - : JodaUtils.umbrellaInterval(targetSegmentGranularity.getIterable(umbrellaInterval)); - - return new CompactionCandidate( - segments, - umbrellaInterval, - compactionInterval, - segmentIntervals.size(), - null - ); + ProposedCompaction( + List segments, + Interval umbrellaInterval, + Interval compactionInterval, + int numDistinctSegmentIntervals + ) + { + this.segments = segments; + this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum(); + + this.umbrellaInterval = umbrellaInterval; + this.compactionInterval = compactionInterval; + + this.numIntervals = numDistinctSegmentIntervals; + this.dataSource = segments.get(0).getDataSource(); + } + + /** + * @return Non-empty list of segments that make up this proposed compaction. + */ + public List getSegments() + { + return segments; + } + + public long getTotalBytes() + { + return totalBytes; + } + + public int numSegments() + { + return segments.size(); + } + + /** + * Umbrella interval of all the segments in this proposed compaction. This typically + * corresponds to a single time chunk in the segment timeline. + */ + public Interval getUmbrellaInterval() + { + return umbrellaInterval; + } + + /** + * Interval aligned to the target segment granularity used for the compaction + * task. This interval completely contains the {@link #umbrellaInterval}. + */ + public Interval getCompactionInterval() + { + return compactionInterval; + } + + public String getDataSource() + { + return dataSource; + } + + public CompactionStatistics getStats() + { + return CompactionStatistics.create(totalBytes, numSegments(), numIntervals); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ProposedCompaction that = (ProposedCompaction) o; + return totalBytes == that.totalBytes + && numIntervals == that.numIntervals + && segments.equals(that.segments) + && umbrellaInterval.equals(that.umbrellaInterval) + && compactionInterval.equals(that.compactionInterval) + && dataSource.equals(that.dataSource); + } + + @Override + public int hashCode() + { + return Objects.hash(segments, umbrellaInterval, compactionInterval, dataSource, totalBytes, numIntervals); + } + + @Override + public String toString() + { + return "ProposedCompaction{" + + "datasource=" + dataSource + + ", umbrellaInterval=" + umbrellaInterval + + ", compactionInterval=" + compactionInterval + + ", numIntervals=" + numIntervals + + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", totalSize=" + totalBytes + + '}'; + } } - private CompactionCandidate( - List segments, - Interval umbrellaInterval, - Interval compactionInterval, - int numDistinctSegmentIntervals, - @Nullable CompactionStatus currentStatus - ) + /** + * Used by {@link CompactionStatusTracker#computeCompactionTaskState(CompactionCandidate)}. + * The callsite then determines whether to launch compaction task or not. + */ + public enum TaskState { - this.segments = segments; - this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum(); + // no other compaction candidate is running, we can start a new task + READY, + // compaction candidate is already running under a task + TASK_IN_PROGRESS, + // compaction candidate has recently been completed, and the segment timeline has not yet updated after that + RECENTLY_COMPLETED + } + + private final ProposedCompaction proposedCompaction; - this.umbrellaInterval = umbrellaInterval; - this.compactionInterval = compactionInterval; + private final CompactionStatus eligibility; + @Nullable + private final String policyNote; + private final CompactionMode mode; + + CompactionCandidate( + ProposedCompaction proposedCompaction, + CompactionStatus eligibility, + @Nullable String policyNote, + CompactionMode mode + ) + { + this.proposedCompaction = Preconditions.checkNotNull(proposedCompaction, "proposedCompaction"); + this.eligibility = Preconditions.checkNotNull(eligibility, "eligibility"); + this.policyNote = policyNote; + this.mode = Preconditions.checkNotNull(mode, "mode"); + } - this.numIntervals = numDistinctSegmentIntervals; - this.dataSource = segments.get(0).getDataSource(); - this.currentStatus = currentStatus; + public ProposedCompaction getProposedCompaction() + { + return proposedCompaction; } /** @@ -96,17 +222,17 @@ private CompactionCandidate( */ public List getSegments() { - return segments; + return proposedCompaction.getSegments(); } public long getTotalBytes() { - return totalBytes; + return proposedCompaction.getTotalBytes(); } public int numSegments() { - return segments.size(); + return proposedCompaction.numSegments(); } /** @@ -115,67 +241,52 @@ public int numSegments() */ public Interval getUmbrellaInterval() { - return umbrellaInterval; + return proposedCompaction.getUmbrellaInterval(); } /** * Interval aligned to the target segment granularity used for the compaction - * task. This interval completely contains the {@link #umbrellaInterval}. + * task. This interval completely contains the {@link #getUmbrellaInterval()}. */ public Interval getCompactionInterval() { - return compactionInterval; + return proposedCompaction.getCompactionInterval(); } public String getDataSource() { - return dataSource; + return proposedCompaction.getDataSource(); } public CompactionStatistics getStats() { - return CompactionStatistics.create(totalBytes, numSegments(), numIntervals); - } - - @Nullable - public CompactionStatistics getCompactedStats() - { - return (currentStatus == null || currentStatus.getCompactedStats() == null) - ? null : currentStatus.getCompactedStats(); + return proposedCompaction.getStats(); } @Nullable - public CompactionStatistics getUncompactedStats() + public String getPolicyNote() { - return (currentStatus == null || currentStatus.getUncompactedStats() == null) - ? null : currentStatus.getUncompactedStats(); + return policyNote; } - /** - * Current compaction status of the time chunk corresponding to this candidate. - */ - @Nullable - public CompactionStatus getCurrentStatus() + public CompactionMode getMode() { - return currentStatus; + return mode; } - /** - * Creates a copy of this CompactionCandidate object with the given status. - */ - public CompactionCandidate withCurrentStatus(CompactionStatus status) + public CompactionStatus getEligibility() { - return new CompactionCandidate(segments, umbrellaInterval, compactionInterval, numIntervals, status); + return eligibility; } @Override public String toString() { return "SegmentsToCompact{" + - "datasource=" + dataSource + - ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + - ", totalSize=" + totalBytes + - ", currentStatus=" + currentStatus + + ", proposedCompaction=" + proposedCompaction + + ", eligibility=" + eligibility + + ", policyNote=" + policyNote + + ", mode=" + mode + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index bfb69787dd84..3afe296297b1 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -21,11 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.duty.CompactSegments; -import java.util.Objects; - /** * Policy used by {@link CompactSegments} duty to pick segments for compaction. */ @@ -48,74 +45,11 @@ public interface CompactionCandidateSearchPolicy int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB); /** - * Checks if the given {@link CompactionCandidate} is eligible for compaction - * in the current iteration. A policy may implement this method to skip - * compacting intervals or segments that do not fulfil some required criteria. + * Creates a {@link CompactionCandidate} after applying policy-specific checks to the proposed compaction candidate. * - * @return {@link Eligibility#OK} only if eligible. - */ - Eligibility checkEligibilityForCompaction( - CompactionCandidate candidate, - CompactionTaskStatus latestTaskStatus - ); - - /** - * Describes the eligibility of an interval for compaction. + * @param candidate the proposed compaction + * @param eligibility initial eligibility from compaction config checks + * @return final compaction candidate */ - class Eligibility - { - public static final Eligibility OK = new Eligibility(true, null); - - private final boolean eligible; - private final String reason; - - private Eligibility(boolean eligible, String reason) - { - this.eligible = eligible; - this.reason = reason; - } - - public boolean isEligible() - { - return eligible; - } - - public String getReason() - { - return reason; - } - - public static Eligibility fail(String messageFormat, Object... args) - { - return new Eligibility(false, StringUtils.format(messageFormat, args)); - } - - @Override - public boolean equals(Object object) - { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - Eligibility that = (Eligibility) object; - return eligible == that.eligible && Objects.equals(reason, that.reason); - } - - @Override - public int hashCode() - { - return Objects.hash(eligible, reason); - } - - @Override - public String toString() - { - return "Eligibility{" + - "eligible=" + eligible + - ", reason='" + reason + '\'' + - '}'; - } - } + CompactionCandidate createCandidate(CompactionCandidate.ProposedCompaction candidate, CompactionStatus eligibility); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java new file mode 100644 index 000000000000..16782a3c28e0 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; + +public enum CompactionMode +{ + FULL_COMPACTION { + @Override + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction proposedCompaction, + CompactionStatus eligibility, + @Nullable String policyNote + ) + { + return new CompactionCandidate(proposedCompaction, eligibility, policyNote, this); + } + }, + NOT_APPLICABLE; + + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction proposedCompaction, + CompactionStatus eligibility + ) + { + return createCandidate(proposedCompaction, eligibility, null); + } + + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction proposedCompaction, + CompactionStatus eligibility, + @Nullable String policyNote + ) + { + throw DruidException.defensive("Cannot create compaction candidate with mode[%s]", this); + } + + public static CompactionCandidate failWithPolicyCheck( + CompactionCandidate.ProposedCompaction proposedCompaction, + CompactionStatus eligibility, + String reasonFormat, + Object... args + ) + { + return new CompactionCandidate( + proposedCompaction, + eligibility, + StringUtils.format(reasonFormat, args), + CompactionMode.NOT_APPLICABLE + ); + } + + public static CompactionCandidate notEligible( + CompactionCandidate.ProposedCompaction proposedCompaction, + String reason + ) + { + // CompactionStatus returns an ineligible reason, have not even got to policy check yet + return new CompactionCandidate( + proposedCompaction, + CompactionStatus.notEligible(reason), + null, + CompactionMode.NOT_APPLICABLE + ); + } + + public static CompactionCandidate complete(CompactionCandidate.ProposedCompaction proposedCompaction) + { + return new CompactionCandidate(proposedCompaction, CompactionStatus.COMPLETE, null, CompactionMode.NOT_APPLICABLE); + } +} diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java index bab65f90bf94..1f12b0c2f327 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java @@ -25,6 +25,8 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.common.guava.GuavaUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; @@ -87,36 +89,80 @@ public CompactionSimulateResult simulateRunWithConfig( // account for the active tasks final CompactionStatusTracker simulationStatusTracker = new CompactionStatusTracker() { + + @Override + public void onSkippedCandidate( + CompactionCandidate candidateSegments, + DataSourceCompactionConfig config + ) + { + skippedIntervals.addRow(createRow( + candidateSegments, + null, + GuavaUtils.firstNonNull( + candidateSegments.getPolicyNote(), + candidateSegments.getEligibility().getReason() + ) + )); + } + @Override - public CompactionStatus computeCompactionStatus( - CompactionCandidate candidate, - CompactionCandidateSearchPolicy searchPolicy + public void onCompactionCandidates( + CompactionCandidate candidateSegments, + DataSourceCompactionConfig config ) { - return statusTracker.computeCompactionStatus(candidate, searchPolicy); + switch (candidateSegments.getMode()) { + case NOT_APPLICABLE: + skippedIntervals.addRow(createRow( + candidateSegments, + null, + GuavaUtils.firstNonNull( + candidateSegments.getPolicyNote(), + candidateSegments.getEligibility().getReason() + ) + )); + break; + case FULL_COMPACTION: + queuedIntervals.addRow(createRow( + candidateSegments, + ClientCompactionTaskQueryTuningConfig.from(config), + GuavaUtils.firstNonNull( + candidateSegments.getPolicyNote(), + candidateSegments.getEligibility().getReason() + ) + )); + break; + default: + throw DruidException.defensive("unexpected compaction mode[%s]", candidateSegments.getMode()); + } } @Override - public void onCompactionStatusComputed( + public void onCompactionTaskStateComputed( CompactionCandidate candidateSegments, + CompactionCandidate.TaskState taskState, DataSourceCompactionConfig config ) { - final CompactionStatus status = candidateSegments.getCurrentStatus(); - if (status == null) { - // do nothing - } else if (status.getState() == CompactionStatus.State.COMPLETE) { - compactedIntervals.addRow( - createRow(candidateSegments, null, null) - ); - } else if (status.getState() == CompactionStatus.State.RUNNING) { - runningIntervals.addRow( - createRow(candidateSegments, ClientCompactionTaskQueryTuningConfig.from(config), status.getReason()) - ); - } else if (status.getState() == CompactionStatus.State.SKIPPED) { - skippedIntervals.addRow( - createRow(candidateSegments, null, status.getReason()) - ); + switch (taskState) { + case RECENTLY_COMPLETED: + compactedIntervals.addRow(createRow(candidateSegments, null, null)); + break; + case TASK_IN_PROGRESS: + runningIntervals.addRow(createRow( + candidateSegments, + ClientCompactionTaskQueryTuningConfig.from(config), + GuavaUtils.firstNonNull( + candidateSegments.getPolicyNote(), + candidateSegments.getEligibility().getReason() + ) + )); + break; + case READY: + break; + default: + throw DruidException.defensive("unknown compaction task state[%s]", taskState); } } @@ -124,10 +170,11 @@ public void onCompactionStatusComputed( public void onTaskSubmitted(String taskId, CompactionCandidate candidateSegments) { // Add a row for each task in order of submission - final CompactionStatus status = candidateSegments.getCurrentStatus(); - queuedIntervals.addRow( - createRow(candidateSegments, null, status == null ? "" : status.getReason()) + final String reason = GuavaUtils.firstNonNull( + candidateSegments.getPolicyNote(), + candidateSegments.getEligibility().getReason() ); + queuedIntervals.addRow(createRow(candidateSegments, null, reason)); } }; @@ -150,21 +197,18 @@ public void onTaskSubmitted(String taskId, CompactionCandidate candidateSegments stats ); - final Map compactionStates = new HashMap<>(); + final Map compactionStates = new HashMap<>(); if (!compactedIntervals.isEmpty()) { - compactionStates.put(CompactionStatus.State.COMPLETE, compactedIntervals); + compactionStates.put(CompactionCandidate.TaskState.RECENTLY_COMPLETED, compactedIntervals); } if (!runningIntervals.isEmpty()) { - compactionStates.put(CompactionStatus.State.RUNNING, runningIntervals); + compactionStates.put(CompactionCandidate.TaskState.TASK_IN_PROGRESS, runningIntervals); } if (!queuedIntervals.isEmpty()) { - compactionStates.put(CompactionStatus.State.PENDING, queuedIntervals); - } - if (!skippedIntervals.isEmpty()) { - compactionStates.put(CompactionStatus.State.SKIPPED, skippedIntervals); + compactionStates.put(CompactionCandidate.TaskState.READY, queuedIntervals); } - return new CompactionSimulateResult(compactionStates); + return new CompactionSimulateResult(compactionStates, skippedIntervals); } private Object[] createRow( diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java index 7a48ccf0e5ba..6b91c03ff40d 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionSimulateResult.java @@ -26,19 +26,28 @@ public class CompactionSimulateResult { - private final Map compactionStates; + private final Map compactionStates; + private final Table skippedIntervals; @JsonCreator public CompactionSimulateResult( - @JsonProperty("compactionStates") Map compactionStates + @JsonProperty("compactionStates") Map compactionStates, + @JsonProperty("skippedIntervals") Table skippedIntervals ) { this.compactionStates = compactionStates; + this.skippedIntervals = skippedIntervals; } @JsonProperty - public Map getCompactionStates() + public Map getCompactionStates() { return compactionStates; } + + @JsonProperty + public Table getSkippedIntervals() + { + return skippedIntervals; + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index 99e1eef21465..917ec0c85994 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -19,10 +19,13 @@ package org.apache.druid.server.compaction; +import com.google.common.base.Strings; import org.apache.commons.lang3.ArrayUtils; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.common.config.Configs; import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -48,28 +51,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; /** - * Represents the status of compaction for a given {@link CompactionCandidate}. + * Describes the eligibility of an interval for compaction. */ public class CompactionStatus { - private static final Logger log = new Logger(CompactionStatus.class); - - private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null, null, null); + public static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, "", null, null); public enum State { - COMPLETE, PENDING, RUNNING, SKIPPED + COMPLETE, ELIGIBLE, NOT_ELIGIBLE } /** * List of checks performed to determine if compaction is already complete based on indexing state fingerprints. */ - private static final List> FINGERPRINT_CHECKS = List.of( + static final List> FINGERPRINT_CHECKS = List.of( Evaluator::allFingerprintedCandidatesHaveExpectedFingerprint ); @@ -78,7 +80,7 @@ public enum State *

* The order of the checks must be honored while evaluating them. */ - private static final List> CHECKS = Arrays.asList( + static final List> CHECKS = Arrays.asList( Evaluator::partitionsSpecIsUpToDate, Evaluator::indexSpecIsUpToDate, Evaluator::segmentGranularityIsUpToDate, @@ -90,32 +92,56 @@ public enum State Evaluator::projectionsAreUpToDate ); + public static CompactionStatus notEligible(String messageFormat, Object... args) + { + return new CompactionStatus(State.NOT_ELIGIBLE, StringUtils.format(messageFormat, args), null, null); + } + private final State state; private final String reason; - private final CompactionStatistics compactedStats; - private final CompactionStatistics uncompactedStats; + + @Nullable + private final CompactionStatistics compacted; + @Nullable + private final CompactionStatistics uncompacted; private CompactionStatus( State state, String reason, - CompactionStatistics compactedStats, - CompactionStatistics uncompactedStats + @Nullable CompactionStatistics compacted, + @Nullable CompactionStatistics uncompacted ) { this.state = state; this.reason = reason; - this.compactedStats = compactedStats; - this.uncompactedStats = uncompactedStats; + switch (state) { + case COMPLETE: + break; + case NOT_ELIGIBLE: + InvalidInput.conditionalException( + !Strings.isNullOrEmpty(reason), + "must provide a reason why compaction not eligible" + ); + break; + case ELIGIBLE: + InvalidInput.conditionalException(compacted != null, "must provide compacted stats for compaction"); + InvalidInput.conditionalException(uncompacted != null, "must provide uncompacted stats for compaction"); + break; + default: + throw DruidException.defensive("unexpected compaction status state[%s]", state); + } + this.compacted = compacted; + this.uncompacted = uncompacted; } - public boolean isComplete() + static CompactionStatusBuilder builder(State state, String reason) { - return state == State.COMPLETE; + return new CompactionStatusBuilder(state, reason); } - public boolean isSkipped() + public State getState() { - return state == State.SKIPPED; + return state; } public String getReason() @@ -123,58 +149,82 @@ public String getReason() return reason; } - public State getState() + @Nullable + public CompactionStatistics getUncompactedStats() { - return state; + return uncompacted; } + @Nullable public CompactionStatistics getCompactedStats() { - return compactedStats; + return compacted; } - public CompactionStatistics getUncompactedStats() + /** + * Evaluates a compaction candidate to determine its eligibility and compaction status. + *

+ * This method performs a two-stage evaluation: + *

    + *
  1. First, uses {@link Evaluator} to check if the candidate needs compaction + * based on the compaction config (e.g., checking segment granularity, partitions spec, etc.)
  2. + *
  3. Then, applies the search policy to determine if this candidate should be compacted in the + * current run (e.g., checking minimum segment count, bytes, or other policy criteria)
  4. + *
+ * + * @param proposedCompaction the compaction candidate to evaluate + * @param config the compaction configuration for the datasource + * @param fingerprintMapper mapper for indexing state fingerprints + * @return a new {@link CompactionCandidate} with updated eligibility and status. For incremental + * compaction, returns a candidate containing only the uncompacted segments. + */ + public static CompactionStatus compute( + CompactionCandidate.ProposedCompaction proposedCompaction, + DataSourceCompactionConfig config, + IndexingStateFingerprintMapper fingerprintMapper + ) { - return uncompactedStats; + return new Evaluator(proposedCompaction, config, fingerprintMapper).evaluate(); } @Override - public String toString() + public boolean equals(Object object) { - return "CompactionStatus{" + - "state=" + state + - ", reason=" + reason + - ", compactedStats=" + compactedStats + - ", uncompactedStats=" + uncompactedStats + - '}'; + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + CompactionStatus that = (CompactionStatus) object; + return state == that.state + && Objects.equals(reason, that.reason) + && Objects.equals(compacted, that.compacted) + && Objects.equals(uncompacted, that.uncompacted); } - public static CompactionStatus pending(String reasonFormat, Object... args) + @Override + public int hashCode() { - return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args), null, null); + return Objects.hash(state, reason, compacted, uncompacted); } - public static CompactionStatus pending( - CompactionStatistics compactedStats, - CompactionStatistics uncompactedStats, - String reasonFormat, - Object... args - ) + @Override + public String toString() { - return new CompactionStatus( - State.PENDING, - StringUtils.format(reasonFormat, args), - compactedStats, - uncompactedStats - ); + return "CompactionStatus{" + + "state=" + state + + ", reason='" + reason + '\'' + + ", compacted=" + compacted + + ", uncompacted=" + uncompacted + + '}'; } /** - * Computes compaction status for the given field. The status is assumed to be - * COMPLETE (i.e. no further compaction is required) if the configured value - * of the field is null or equal to the current value. + * Returns a 'mismatch' reason to be eligible for compaction if config doesn't match, NULL if config matches. */ - private static CompactionStatus completeIfNullOrEqual( + @Nullable + private static String getConfigMismatchReason( String field, T configured, T current, @@ -182,20 +232,20 @@ private static CompactionStatus completeIfNullOrEqual( ) { if (configured == null || configured.equals(current)) { - return COMPLETE; + return null; } else { return configChanged(field, configured, current, stringFunction); } } - private static CompactionStatus configChanged( + private static String configChanged( String field, T target, T current, Function stringFunction ) { - return CompactionStatus.pending( + return StringUtils.format( "'%s' mismatch: required[%s], current[%s]", field, target == null ? null : stringFunction.apply(target), @@ -241,41 +291,6 @@ private static String asString(PartitionsSpec partitionsSpec) } } - public static CompactionStatus skipped(String reasonFormat, Object... args) - { - return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args), null, null); - } - - public static CompactionStatus running(String message) - { - return new CompactionStatus(State.RUNNING, message, null, null); - } - - /** - * Determines the CompactionStatus of the given candidate segments by evaluating - * the {@link #CHECKS} one by one. If any check returns an incomplete status, - * further checks are still performed to determine the number of uncompacted - * segments but only the first incomplete status is returned. - */ - static CompactionStatus compute( - CompactionCandidate candidateSegments, - DataSourceCompactionConfig config, - @Nullable IndexingStateFingerprintMapper fingerprintMapper - ) - { - final CompactionState expectedState = config.toCompactionState(); - String expectedFingerprint; - if (fingerprintMapper == null) { - expectedFingerprint = null; - } else { - expectedFingerprint = fingerprintMapper.generateFingerprint( - config.getDataSource(), - expectedState - ); - } - return new Evaluator(candidateSegments, config, expectedFingerprint, fingerprintMapper).evaluate(); - } - @Nullable public static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig) { @@ -346,8 +361,10 @@ static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRan */ private static class Evaluator { + private static final Logger log = new Logger(Evaluator.class); + private final DataSourceCompactionConfig compactionConfig; - private final CompactionCandidate candidateSegments; + private final CompactionCandidate.ProposedCompaction proposedCompaction; private final ClientCompactionTaskQueryTuningConfig tuningConfig; private final UserCompactionTaskGranularityConfig configuredGranularitySpec; @@ -357,47 +374,63 @@ private static class Evaluator private final Map> unknownStateToSegments = new HashMap<>(); @Nullable - private final String targetFingerprint; private final IndexingStateFingerprintMapper fingerprintMapper; + @Nullable + private final String targetFingerprint; private Evaluator( - CompactionCandidate candidateSegments, + CompactionCandidate.ProposedCompaction proposedCompaction, DataSourceCompactionConfig compactionConfig, - @Nullable String targetFingerprint, @Nullable IndexingStateFingerprintMapper fingerprintMapper ) { - this.candidateSegments = candidateSegments; + this.proposedCompaction = proposedCompaction; this.compactionConfig = compactionConfig; this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig); this.configuredGranularitySpec = compactionConfig.getGranularitySpec(); - this.targetFingerprint = targetFingerprint; this.fingerprintMapper = fingerprintMapper; + if (fingerprintMapper == null) { + targetFingerprint = null; + } else { + targetFingerprint = fingerprintMapper.generateFingerprint( + compactionConfig.getDataSource(), + compactionConfig.toCompactionState() + ); + } } + /** + * Evaluates the compaction status of candidate segments through a multi-step process: + *
    + *
  1. Validates input bytes are within limits
  2. + *
  3. Categorizes segments by compaction state (fingerprinted, uncompacted, or unknown)
  4. + *
  5. Performs fingerprint-based validation if available (fast path)
  6. + *
  7. Runs detailed checks against unknown states via {@link CompactionStatus#CHECKS}
  8. + *
+ * + * @return Pair of eligibility status and compaction status with reason for first failed check + */ private CompactionStatus evaluate() { - final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit(); - if (inputBytesCheck.isSkipped()) { - return inputBytesCheck; + final String inputBytesCheck = inputBytesAreWithinLimit(); + if (inputBytesCheck != null) { + return CompactionStatus.notEligible(inputBytesCheck); } List reasonsForCompaction = new ArrayList<>(); - CompactionStatus compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); - if (!compactedOnceCheck.isComplete()) { - reasonsForCompaction.add(compactedOnceCheck.getReason()); + String compactedOnceCheck = segmentsHaveBeenCompactedAtLeastOnce(); + if (compactedOnceCheck != null) { + reasonsForCompaction.add(compactedOnceCheck); } if (fingerprintMapper != null && targetFingerprint != null) { // First try fingerprint-based evaluation (fast path) - CompactionStatus fingerprintStatus = FINGERPRINT_CHECKS.stream() - .map(f -> f.apply(this)) - .filter(status -> !status.isComplete()) - .findFirst().orElse(COMPLETE); + FINGERPRINT_CHECKS.stream() + .map(f -> f.apply(this)) + .filter(Objects::nonNull) + .findFirst() + .ifPresent(reasonsForCompaction::add); - if (!fingerprintStatus.isComplete()) { - reasonsForCompaction.add(fingerprintStatus.getReason()); - } } if (!unknownStateToSegments.isEmpty()) { @@ -405,13 +438,12 @@ private CompactionStatus evaluate() reasonsForCompaction.addAll( CHECKS.stream() .map(f -> f.apply(this)) - .filter(status -> !status.isComplete()) - .map(CompactionStatus::getReason) + .filter(Objects::nonNull) .collect(Collectors.toList()) ); // Any segments left in unknownStateToSegments passed all checks and are considered compacted - this.compactedSegments.addAll( + compactedSegments.addAll( unknownStateToSegments .values() .stream() @@ -421,13 +453,11 @@ private CompactionStatus evaluate() } if (reasonsForCompaction.isEmpty()) { - return COMPLETE; + return CompactionStatus.COMPLETE; } else { - return CompactionStatus.pending( - createStats(this.compactedSegments), - createStats(uncompactedSegments), - reasonsForCompaction.get(0) - ); + return builder(State.ELIGIBLE, reasonsForCompaction.get(0)).compacted(createStats(compactedSegments)) + .uncompacted(createStats(uncompactedSegments)) + .build(); } } @@ -439,7 +469,7 @@ private CompactionStatus evaluate() * {@link #unknownStateToSegments} where their indexing states will be analyzed. *

*/ - private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() + private String allFingerprintedCandidatesHaveExpectedFingerprint() { Map> mismatchedFingerprintToSegmentMap = new HashMap<>(); for (DataSegment segment : fingerprintedSegments) { @@ -457,18 +487,18 @@ private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() if (mismatchedFingerprintToSegmentMap.isEmpty()) { // All fingerprinted segments have the expected fingerprint - compaction is complete - return COMPLETE; + return null; } if (fingerprintMapper == null) { // Cannot evaluate further without a fingerprint mapper uncompactedSegments.addAll( mismatchedFingerprintToSegmentMap.values() - .stream() - .flatMap(List::stream) - .collect(Collectors.toList()) + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()) ); - return CompactionStatus.pending("Segments have a mismatched fingerprint and no fingerprint mapper is available"); + return "Segments have a mismatched fingerprint and no fingerprint mapper is available"; } boolean fingerprintedSegmentWithoutCachedStateFound = false; @@ -490,14 +520,15 @@ private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() } segments.addAll(e.getValue()); return segments; - }); + } + ); } } if (fingerprintedSegmentWithoutCachedStateFound) { - return CompactionStatus.pending("One or more fingerprinted segments do not have a cached indexing state"); + return "One or more fingerprinted segments do not have a cached indexing state"; } else { - return COMPLETE; + return null; } } @@ -505,9 +536,9 @@ private CompactionStatus allFingerprintedCandidatesHaveExpectedFingerprint() * Checks if all the segments have been compacted at least once and groups them into uncompacted, fingerprinted, or * non-fingerprinted. */ - private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce() + private String segmentsHaveBeenCompactedAtLeastOnce() { - for (DataSegment segment : candidateSegments.getSegments()) { + for (DataSegment segment : proposedCompaction.getSegments()) { final String fingerprint = segment.getIndexingStateFingerprint(); final CompactionState segmentState = segment.getLastCompactionState(); if (fingerprint != null) { @@ -520,58 +551,58 @@ private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce() } if (uncompactedSegments.isEmpty()) { - return COMPLETE; + return null; } else { - return CompactionStatus.pending("not compacted yet"); + return "not compacted yet"; } } - private CompactionStatus partitionsSpecIsUpToDate() + private String partitionsSpecIsUpToDate() { return evaluateForAllCompactionStates(this::partitionsSpecIsUpToDate); } - private CompactionStatus indexSpecIsUpToDate() + private String indexSpecIsUpToDate() { return evaluateForAllCompactionStates(this::indexSpecIsUpToDate); } - private CompactionStatus projectionsAreUpToDate() + private String projectionsAreUpToDate() { return evaluateForAllCompactionStates(this::projectionsAreUpToDate); } - private CompactionStatus segmentGranularityIsUpToDate() + private String segmentGranularityIsUpToDate() { return evaluateForAllCompactionStates(this::segmentGranularityIsUpToDate); } - private CompactionStatus rollupIsUpToDate() + private String rollupIsUpToDate() { return evaluateForAllCompactionStates(this::rollupIsUpToDate); } - private CompactionStatus queryGranularityIsUpToDate() + private String queryGranularityIsUpToDate() { return evaluateForAllCompactionStates(this::queryGranularityIsUpToDate); } - private CompactionStatus dimensionsSpecIsUpToDate() + private String dimensionsSpecIsUpToDate() { return evaluateForAllCompactionStates(this::dimensionsSpecIsUpToDate); } - private CompactionStatus metricsSpecIsUpToDate() + private String metricsSpecIsUpToDate() { return evaluateForAllCompactionStates(this::metricsSpecIsUpToDate); } - private CompactionStatus transformSpecFilterIsUpToDate() + private String transformSpecFilterIsUpToDate() { return evaluateForAllCompactionStates(this::transformSpecFilterIsUpToDate); } - private CompactionStatus partitionsSpecIsUpToDate(CompactionState lastCompactionState) + private String partitionsSpecIsUpToDate(CompactionState lastCompactionState) { PartitionsSpec existingPartionsSpec = lastCompactionState.getPartitionsSpec(); if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) { @@ -579,9 +610,10 @@ private CompactionStatus partitionsSpecIsUpToDate(CompactionState lastCompaction } else if (existingPartionsSpec instanceof DynamicPartitionsSpec) { existingPartionsSpec = new DynamicPartitionsSpec( existingPartionsSpec.getMaxRowsPerSegment(), - ((DynamicPartitionsSpec) existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE)); + ((DynamicPartitionsSpec) existingPartionsSpec).getMaxTotalRowsOr(Long.MAX_VALUE) + ); } - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "partitionsSpec", findPartitionsSpecFromConfig(tuningConfig), existingPartionsSpec, @@ -589,9 +621,9 @@ private CompactionStatus partitionsSpecIsUpToDate(CompactionState lastCompaction ); } - private CompactionStatus indexSpecIsUpToDate(CompactionState lastCompactionState) + private String indexSpecIsUpToDate(CompactionState lastCompactionState) { - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "indexSpec", Configs.valueOrDefault(tuningConfig.getIndexSpec(), IndexSpec.getDefault()).getEffectiveSpec(), lastCompactionState.getIndexSpec().getEffectiveSpec(), @@ -599,9 +631,9 @@ private CompactionStatus indexSpecIsUpToDate(CompactionState lastCompactionState ); } - private CompactionStatus projectionsAreUpToDate(CompactionState lastCompactionState) + private String projectionsAreUpToDate(CompactionState lastCompactionState) { - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "projections", compactionConfig.getProjections(), lastCompactionState.getProjections(), @@ -609,24 +641,24 @@ private CompactionStatus projectionsAreUpToDate(CompactionState lastCompactionSt ); } - private CompactionStatus inputBytesAreWithinLimit() + @Nullable + private String inputBytesAreWithinLimit() { final long inputSegmentSize = compactionConfig.getInputSegmentSizeBytes(); - if (candidateSegments.getTotalBytes() > inputSegmentSize) { - return CompactionStatus.skipped( + if (proposedCompaction.getTotalBytes() > inputSegmentSize) { + return StringUtils.format( "'inputSegmentSize' exceeded: Total segment size[%d] is larger than allowed inputSegmentSize[%d]", - candidateSegments.getTotalBytes(), inputSegmentSize + proposedCompaction.getTotalBytes(), inputSegmentSize ); - } else { - return COMPLETE; } + return null; } - private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompactionState) + private String segmentGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null || configuredGranularitySpec.getSegmentGranularity() == null) { - return COMPLETE; + return null; } final Granularity configuredSegmentGranularity = configuredGranularitySpec.getSegmentGranularity(); @@ -635,7 +667,7 @@ private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompac = existingGranularitySpec == null ? null : existingGranularitySpec.getSegmentGranularity(); if (configuredSegmentGranularity.equals(existingSegmentGranularity)) { - return COMPLETE; + return null; } else if (existingSegmentGranularity == null) { // Candidate segments were compacted without segment granularity specified // Check if the segments already have the desired segment granularity @@ -644,13 +676,13 @@ private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompac segment -> !configuredSegmentGranularity.isAligned(segment.getInterval()) ); if (needsCompaction) { - return CompactionStatus.pending( + return StringUtils.format( "segmentGranularity: segments do not align with target[%s]", - asString(configuredSegmentGranularity) + CompactionStatus.asString(configuredSegmentGranularity) ); } } else { - return CompactionStatus.configChanged( + return configChanged( "segmentGranularity", configuredSegmentGranularity, existingSegmentGranularity, @@ -658,17 +690,17 @@ private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompac ); } - return COMPLETE; + return null; } - private CompactionStatus rollupIsUpToDate(CompactionState lastCompactionState) + private String rollupIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { - return COMPLETE; + return null; } else { final UserCompactionTaskGranularityConfig existingGranularitySpec = getGranularitySpec(lastCompactionState); - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "rollup", configuredGranularitySpec.isRollup(), existingGranularitySpec == null ? null : existingGranularitySpec.isRollup(), @@ -677,14 +709,14 @@ private CompactionStatus rollupIsUpToDate(CompactionState lastCompactionState) } } - private CompactionStatus queryGranularityIsUpToDate(CompactionState lastCompactionState) + private String queryGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { - return COMPLETE; + return null; } else { final UserCompactionTaskGranularityConfig existingGranularitySpec = getGranularitySpec(lastCompactionState); - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "queryGranularity", configuredGranularitySpec.getQueryGranularity(), existingGranularitySpec == null ? null : existingGranularitySpec.getQueryGranularity(), @@ -698,10 +730,10 @@ private CompactionStatus queryGranularityIsUpToDate(CompactionState lastCompacti * which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check. */ - private CompactionStatus dimensionsSpecIsUpToDate(CompactionState lastCompactionState) + private String dimensionsSpecIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getDimensionsSpec() == null) { - return COMPLETE; + return null; } else { List existingDimensions = getNonPartitioningDimensions( lastCompactionState.getDimensionsSpec() == null @@ -717,7 +749,7 @@ private CompactionStatus dimensionsSpecIsUpToDate(CompactionState lastCompaction ? IndexSpec.getDefault() : compactionConfig.getTuningConfig().getIndexSpec() ); - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "dimensionsSpec", configuredDimensions, existingDimensions, @@ -726,11 +758,11 @@ private CompactionStatus dimensionsSpecIsUpToDate(CompactionState lastCompaction } } - private CompactionStatus metricsSpecIsUpToDate(CompactionState lastCompactionState) + private String metricsSpecIsUpToDate(CompactionState lastCompactionState) { final AggregatorFactory[] configuredMetricsSpec = compactionConfig.getMetricsSpec(); if (ArrayUtils.isEmpty(configuredMetricsSpec)) { - return COMPLETE; + return null; } final List metricSpecList = lastCompactionState.getMetricsSpec(); @@ -739,25 +771,25 @@ private CompactionStatus metricsSpecIsUpToDate(CompactionState lastCompactionSta ? null : metricSpecList.toArray(new AggregatorFactory[0]); if (existingMetricsSpec == null || !Arrays.deepEquals(configuredMetricsSpec, existingMetricsSpec)) { - return CompactionStatus.configChanged( + return configChanged( "metricsSpec", configuredMetricsSpec, existingMetricsSpec, Arrays::toString ); } else { - return COMPLETE; + return null; } } - private CompactionStatus transformSpecFilterIsUpToDate(CompactionState lastCompactionState) + private String transformSpecFilterIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getTransformSpec() == null) { - return COMPLETE; + return null; } CompactionTransformSpec existingTransformSpec = lastCompactionState.getTransformSpec(); - return CompactionStatus.completeIfNullOrEqual( + return getConfigMismatchReason( "transformSpec filter", compactionConfig.getTransformSpec().getFilter(), existingTransformSpec == null ? null : existingTransformSpec.getFilter(), @@ -772,22 +804,20 @@ private CompactionStatus transformSpecFilterIsUpToDate(CompactionState lastCompa * * @return The first status which is not COMPLETE. */ - private CompactionStatus evaluateForAllCompactionStates( - Function check - ) + private String evaluateForAllCompactionStates(Function check) { - CompactionStatus firstIncompleteStatus = null; + String firstIncomplete = null; for (CompactionState state : List.copyOf(unknownStateToSegments.keySet())) { - final CompactionStatus status = check.apply(state); - if (!status.isComplete()) { + final String eligibleReason = check.apply(state); + if (eligibleReason != null) { uncompactedSegments.addAll(unknownStateToSegments.remove(state)); - if (firstIncompleteStatus == null) { - firstIncompleteStatus = status; + if (firstIncomplete == null) { + firstIncomplete = eligibleReason; } } } - return firstIncompleteStatus == null ? COMPLETE : firstIncompleteStatus; + return firstIncomplete; } private static UserCompactionTaskGranularityConfig getGranularitySpec( @@ -805,4 +835,35 @@ private static CompactionStatistics createStats(List segments) return CompactionStatistics.create(totalBytes, segments.size(), segmentIntervals.size()); } } + + static class CompactionStatusBuilder + { + private State state; + private CompactionStatistics compacted; + private CompactionStatistics uncompacted; + private String reason; + + CompactionStatusBuilder(State state, String reason) + { + this.state = state; + this.reason = reason; + } + + CompactionStatusBuilder compacted(CompactionStatistics compacted) + { + this.compacted = compacted; + return this; + } + + CompactionStatusBuilder uncompacted(CompactionStatistics uncompacted) + { + this.uncompacted = uncompacted; + return this; + } + + CompactionStatus build() + { + return new CompactionStatus(state, reason, compacted, uncompacted); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index 1dc409e7361e..fb5dd40056e7 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -80,15 +80,12 @@ public Set getSubmittedTaskIds() * This method assumes that the given candidate is eligible for compaction * based on the current compaction config/supervisor of the datasource. */ - public CompactionStatus computeCompactionStatus( - CompactionCandidate candidate, - CompactionCandidateSearchPolicy searchPolicy - ) + public CompactionCandidate.TaskState computeCompactionTaskState(CompactionCandidate candidate) { // Skip intervals that already have a running task final CompactionTaskStatus lastTaskStatus = getLatestTaskStatus(candidate); if (lastTaskStatus != null && lastTaskStatus.getState() == TaskState.RUNNING) { - return CompactionStatus.running("Task for interval is already running"); + return CompactionCandidate.TaskState.TASK_IN_PROGRESS; } // Skip intervals that have been recently compacted if segment timeline is not updated yet @@ -96,27 +93,35 @@ public CompactionStatus computeCompactionStatus( if (lastTaskStatus != null && lastTaskStatus.getState() == TaskState.SUCCESS && snapshotTime != null && snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) { - return CompactionStatus.skipped( - "Segment timeline not updated since last compaction task succeeded" - ); + return CompactionCandidate.TaskState.RECENTLY_COMPLETED; } - // Skip intervals that have been filtered out by the policy - final CompactionCandidateSearchPolicy.Eligibility eligibility - = searchPolicy.checkEligibilityForCompaction(candidate, lastTaskStatus); - if (eligibility.isEligible()) { - return CompactionStatus.pending("Not compacted yet"); - } else { - return CompactionStatus.skipped("Rejected by search policy: %s", eligibility.getReason()); - } + return CompactionCandidate.TaskState.READY; } /** * Tracks the latest compaction status of the given compaction candidates. * Used only by the {@link CompactionRunSimulator}. */ - public void onCompactionStatusComputed( + public void onSkippedCandidate( + CompactionCandidate candidateSegments, + DataSourceCompactionConfig config + ) + { + // Nothing to do, used by simulator + } + + public void onCompactionCandidates( + CompactionCandidate candidateSegments, + DataSourceCompactionConfig config + ) + { + // Nothing to do, used by simulator + } + + public void onCompactionTaskStateComputed( CompactionCandidate candidateSegments, + CompactionCandidate.TaskState taskState, DataSourceCompactionConfig config ) { diff --git a/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java b/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java index 1994e87a6388..12b53a363545 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java @@ -24,9 +24,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; @@ -80,6 +82,7 @@ public class DataSourceCompactibleSegmentIterator implements CompactionSegmentIt // run of the compaction job and skip any interval that was already previously compacted. private final Set queuedIntervals = new HashSet<>(); + private final CompactionCandidateSearchPolicy searchPolicy; private final PriorityQueue queue; public DataSourceCompactibleSegmentIterator( @@ -92,6 +95,7 @@ public DataSourceCompactibleSegmentIterator( { this.config = config; this.dataSource = config.getDataSource(); + this.searchPolicy = searchPolicy; this.queue = new PriorityQueue<>(searchPolicy::compareCandidates); this.fingerprintMapper = indexingStateFingerprintMapper; @@ -121,9 +125,11 @@ private void populateQueue(SegmentTimeline timeline, List skipInterval if (!partialEternitySegments.isEmpty()) { // Do not use the target segment granularity in the CompactionCandidate // as Granularities.getIterable() will cause OOM due to the above issue - CompactionCandidate candidatesWithStatus = CompactionCandidate - .from(partialEternitySegments, null) - .withCurrentStatus(CompactionStatus.skipped("Segments have partial-eternity intervals")); + CompactionCandidate candidatesWithStatus = + CompactionMode.notEligible( + CompactionCandidate.ProposedCompaction.from(partialEternitySegments, null), + "Segments have partial-eternity intervals" + ); skippedSegments.add(candidatesWithStatus); return; } @@ -329,17 +335,40 @@ private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti continue; } - final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); - final CompactionStatus compactionStatus = CompactionStatus.compute(candidates, config, fingerprintMapper); - final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus); + CompactionCandidate.ProposedCompaction proposed = + CompactionCandidate.ProposedCompaction.from(segments, config.getSegmentGranularity()); + final CompactionStatus eligibility = CompactionStatus.compute(proposed, config, fingerprintMapper); + final CompactionCandidate candidate; + switch (eligibility.getState()) { + case COMPLETE: + candidate = CompactionMode.complete(proposed); + break; + case NOT_ELIGIBLE: + candidate = CompactionMode.notEligible(proposed, eligibility.getReason()); + break; + case ELIGIBLE: + candidate = searchPolicy.createCandidate(proposed, eligibility); + break; + default: + throw DruidException.defensive("unknown compaction state[%s]", eligibility.getState()); + } - if (compactionStatus.isComplete()) { - compactedSegments.add(candidatesWithStatus); - } else if (compactionStatus.isSkipped()) { - skippedSegments.add(candidatesWithStatus); - } else if (!queuedIntervals.contains(candidates.getUmbrellaInterval())) { - queue.add(candidatesWithStatus); - queuedIntervals.add(candidates.getUmbrellaInterval()); + switch (candidate.getMode()) { + case FULL_COMPACTION: + if (!queuedIntervals.contains(candidate.getProposedCompaction().getUmbrellaInterval())) { + queue.add(candidate); + queuedIntervals.add(candidate.getProposedCompaction().getUmbrellaInterval()); + } + break; + case NOT_APPLICABLE: + if (CompactionStatus.State.COMPLETE.equals(candidate.getEligibility().getState())) { + compactedSegments.add(candidate); + } else { + skippedSegments.add(candidate); + } + break; + default: + throw DruidException.defensive("Unexpected compaction mode[%s]", candidate.getMode()); } } } @@ -372,16 +401,17 @@ private List findInitialSearchInterval( timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE) ); if (!CollectionUtils.isNullOrEmpty(segments)) { - final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); + final CompactionCandidate.ProposedCompaction candidates = + CompactionCandidate.ProposedCompaction.from(segments, config.getSegmentGranularity()); - final CompactionStatus reason; + final String skipReason; if (candidates.getCompactionInterval().overlaps(latestSkipInterval)) { - reason = CompactionStatus.skipped("skip offset from latest[%s]", skipOffset); + skipReason = StringUtils.format("skip offset from latest[%s]", skipOffset); } else { - reason = CompactionStatus.skipped("interval locked by another task"); + skipReason = "interval locked by another task"; } - final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(reason); + final CompactionCandidate candidatesWithStatus = CompactionMode.notEligible(candidates, skipReason); skippedSegments.add(candidatesWithStatus); } } @@ -436,7 +466,8 @@ static Interval computeLatestSkipInterval( if (configuredSegmentGranularity == null) { return new Interval(skipOffsetFromLatest, latestDataTimestamp); } else { - DateTime skipFromLastest = new DateTime(latestDataTimestamp, latestDataTimestamp.getZone()).minus(skipOffsetFromLatest); + DateTime skipFromLastest = + new DateTime(latestDataTimestamp, latestDataTimestamp.getZone()).minus(skipOffsetFromLatest); DateTime skipOffsetBucketToSegmentGranularity = configuredSegmentGranularity.bucketStart(skipFromLastest); return new Interval(skipOffsetBucketToSegmentGranularity, latestDataTimestamp); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java index 24a2f001afe3..279beca7a654 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java @@ -52,21 +52,25 @@ public List getEligibleCandidates() @Override public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB) { - return findIndex(candidateA) - findIndex(candidateB); + return findIndex(candidateA.getProposedCompaction()) - findIndex(candidateB.getProposedCompaction()); } @Override - public Eligibility checkEligibilityForCompaction( - CompactionCandidate candidate, - CompactionTaskStatus latestTaskStatus + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction candidate, + CompactionStatus eligibility ) { return findIndex(candidate) < Integer.MAX_VALUE - ? Eligibility.OK - : Eligibility.fail("Datasource/Interval is not in the list of 'eligibleCandidates'"); + ? CompactionMode.FULL_COMPACTION.createCandidate(candidate, eligibility) + : CompactionMode.failWithPolicyCheck( + candidate, + eligibility, + "Datasource/Interval is not in the list of 'eligibleCandidates'" + ); } - private int findIndex(CompactionCandidate candidate) + private int findIndex(CompactionCandidate.ProposedCompaction candidate) { int index = 0; for (Candidate eligibleCandidate : eligibleCandidates) { diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index 345988ee7fc2..e07b21238b89 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -109,7 +109,7 @@ public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment() @Override protected Comparator getSegmentComparator() { - return this::compare; + return Comparator.comparing(o -> Objects.requireNonNull(o.getEligibility()), this::compare); } @Override @@ -141,15 +141,16 @@ public int hashCode() @Override public String toString() { - return "MostFragmentedIntervalFirstPolicy{" + - "minUncompactedCount=" + minUncompactedCount + - ", minUncompactedBytes=" + minUncompactedBytes + - ", maxAverageUncompactedBytesPerSegment=" + maxAverageUncompactedBytesPerSegment + - ", priorityDataSource='" + getPriorityDatasource() + '\'' + - '}'; + return + "MostFragmentedIntervalFirstPolicy{" + + "minUncompactedCount=" + minUncompactedCount + + ", minUncompactedBytes=" + minUncompactedBytes + + ", maxAverageUncompactedBytesPerSegment=" + maxAverageUncompactedBytesPerSegment + + ", priorityDataSource='" + getPriorityDatasource() + '\'' + + '}'; } - private int compare(CompactionCandidate candidateA, CompactionCandidate candidateB) + private int compare(CompactionStatus candidateA, CompactionStatus candidateB) { final double fragmentationDiff = computeFragmentationIndex(candidateB) - computeFragmentationIndex(candidateA); @@ -157,37 +158,44 @@ private int compare(CompactionCandidate candidateA, CompactionCandidate candidat } @Override - public Eligibility checkEligibilityForCompaction( - CompactionCandidate candidate, - CompactionTaskStatus latestTaskStatus + public CompactionCandidate createCandidate( + CompactionCandidate.ProposedCompaction candidate, + CompactionStatus eligibility ) { - final CompactionStatistics uncompacted = candidate.getUncompactedStats(); - if (uncompacted == null) { - return Eligibility.OK; - } else if (uncompacted.getNumSegments() < 1) { - return Eligibility.fail("No uncompacted segments in interval"); + final CompactionStatistics uncompacted = Objects.requireNonNull(eligibility.getUncompactedStats()); + + if (uncompacted.getNumSegments() < 1) { + return CompactionMode.failWithPolicyCheck(candidate, eligibility, "No uncompacted segments in interval"); } else if (uncompacted.getNumSegments() < minUncompactedCount) { - return Eligibility.fail( + return CompactionMode.failWithPolicyCheck( + candidate, + eligibility, "Uncompacted segments[%,d] in interval must be at least [%,d]", - uncompacted.getNumSegments(), minUncompactedCount + uncompacted.getNumSegments(), + minUncompactedCount ); } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) { - return Eligibility.fail( + return CompactionMode.failWithPolicyCheck( + candidate, + eligibility, "Uncompacted bytes[%,d] in interval must be at least [%,d]", - uncompacted.getTotalBytes(), minUncompactedBytes.getBytes() + uncompacted.getTotalBytes(), + minUncompactedBytes.getBytes() ); } final long avgSegmentSize = (uncompacted.getTotalBytes() / uncompacted.getNumSegments()); if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) { - return Eligibility.fail( + return CompactionMode.failWithPolicyCheck( + candidate, + eligibility, "Average size[%,d] of uncompacted segments in interval must be at most [%,d]", - avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes() + avgSegmentSize, + maxAverageUncompactedBytesPerSegment.getBytes() ); - } else { - return Eligibility.OK; } + return CompactionMode.FULL_COMPACTION.createCandidate(candidate, eligibility); } /** @@ -197,9 +205,9 @@ public Eligibility checkEligibilityForCompaction( * A higher fragmentation index causes the candidate to be higher in priority * for compaction. */ - private double computeFragmentationIndex(CompactionCandidate candidate) + private double computeFragmentationIndex(CompactionStatus eligibility) { - final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + final CompactionStatistics uncompacted = eligibility.getUncompactedStats(); if (uncompacted == null || uncompacted.getNumSegments() < 1 || uncompacted.getTotalBytes() < 1) { return 0; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index c376aac34406..b18b69f51b91 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -32,8 +32,10 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -43,10 +45,10 @@ import org.apache.druid.segment.transform.CompactionTransformSpec; import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; +import org.apache.druid.server.compaction.CompactionMode; import org.apache.druid.server.compaction.CompactionSegmentIterator; import org.apache.druid.server.compaction.CompactionSlotManager; import org.apache.druid.server.compaction.CompactionSnapshotBuilder; -import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.compaction.CompactionStatusTracker; import org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -192,7 +194,6 @@ public void run( compactionSnapshotBuilder, slotManager, iterator, - policy, defaultEngine ); @@ -228,7 +229,6 @@ private int submitCompactionTasks( CompactionSnapshotBuilder snapshotBuilder, CompactionSlotManager slotManager, CompactionSegmentIterator iterator, - CompactionCandidateSearchPolicy policy, CompactionEngine defaultEngine ) { @@ -244,20 +244,18 @@ private int submitCompactionTasks( final String dataSourceName = entry.getDataSource(); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); - final CompactionStatus compactionStatus = - statusTracker.computeCompactionStatus(entry, policy); - final CompactionCandidate candidatesWithStatus = entry.withCurrentStatus(compactionStatus); - statusTracker.onCompactionStatusComputed(candidatesWithStatus, config); - - if (compactionStatus.isComplete()) { - snapshotBuilder.addToComplete(candidatesWithStatus); - continue; - } else if (compactionStatus.isSkipped()) { - snapshotBuilder.addToSkipped(candidatesWithStatus); - continue; - } else { - // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics - snapshotBuilder.addToComplete(entry); + final CompactionCandidate.TaskState compactionTaskState = statusTracker.computeCompactionTaskState(entry); + statusTracker.onCompactionTaskStateComputed(entry, compactionTaskState, config); + + switch (compactionTaskState) { + case READY: + case TASK_IN_PROGRESS: + case RECENTLY_COMPLETED: + // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics + snapshotBuilder.addToComplete(entry); + break; + default: + throw DruidException.defensive("unexpected task state[%s]", compactionTaskState); } final ClientCompactionTaskQuery taskPayload = createCompactionTask( @@ -371,8 +369,11 @@ public static ClientCompactionTaskQuery createCompactionTask( } final Map autoCompactionContext = newAutoCompactionContext(config.getTaskContext()); - if (candidate.getCurrentStatus() != null) { - autoCompactionContext.put(COMPACTION_REASON_KEY, candidate.getCurrentStatus().getReason()); + if (CompactionMode.NOT_APPLICABLE.equals(candidate.getMode())) { + autoCompactionContext.put( + COMPACTION_REASON_KEY, + GuavaUtils.firstNonNull(candidate.getPolicyNote(), candidate.getEligibility().getReason()) + ); } autoCompactionContext.put(STORE_COMPACTION_STATE_KEY, storeCompactionStatePerSegment); @@ -418,7 +419,7 @@ private void updateCompactionSnapshotStats( } iterator.getCompactedSegments().forEach(snapshotBuilder::addToComplete); iterator.getSkippedSegments().forEach(entry -> { - statusTracker.onCompactionStatusComputed(entry, datasourceToConfig.get(entry.getDataSource())); + statusTracker.onSkippedCandidate(entry, datasourceToConfig.get(entry.getDataSource())); snapshotBuilder.addToSkipped(entry); }); @@ -463,14 +464,19 @@ private static ClientCompactionTaskQuery compactSegments( context.put("priority", compactionTaskPriority); final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null); + final ClientCompactionIntervalSpec clientCompactionIntervalSpec; + switch (entry.getMode()) { + case FULL_COMPACTION: + clientCompactionIntervalSpec = new ClientCompactionIntervalSpec(entry.getCompactionInterval(), null); + break; + default: + throw DruidException.defensive("Unexpected compaction mode[%s]", entry.getMode()); + } return new ClientCompactionTaskQuery( taskId, dataSource, - new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec(entry.getCompactionInterval(), null), - dropExisting - ), + new ClientCompactionIOConfig(clientCompactionIntervalSpec, dropExisting), tuningConfig, granularitySpec, dimensionsSpec, diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java index 46ecc64d72d1..6aa1f976bad4 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java @@ -19,12 +19,11 @@ package org.apache.druid.client.indexing; -import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.segment.IndexIO; -import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionCandidate.ProposedCompaction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.junit.Assert; @@ -32,6 +31,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; public class ClientCompactionIntervalSpecTest { @@ -73,7 +73,7 @@ public class ClientCompactionIntervalSpecTest public void testFromSegmentWithNoSegmentGranularity() { // The umbrella interval of segments is 2015-02-12/2015-04-14 - CompactionCandidate actual = CompactionCandidate.from(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), null); + ProposedCompaction actual = ProposedCompaction.from(List.of(dataSegment1, dataSegment2, dataSegment3), null); Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getCompactionInterval()); } @@ -81,7 +81,7 @@ public void testFromSegmentWithNoSegmentGranularity() public void testFromSegmentWitSegmentGranularitySameAsSegment() { // The umbrella interval of segments is 2015-04-11/2015-04-12 - CompactionCandidate actual = CompactionCandidate.from(ImmutableList.of(dataSegment1), Granularities.DAY); + ProposedCompaction actual = ProposedCompaction.from(List.of(dataSegment1), Granularities.DAY); Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), actual.getCompactionInterval()); } @@ -89,7 +89,10 @@ public void testFromSegmentWitSegmentGranularitySameAsSegment() public void testFromSegmentWithCoarserSegmentGranularity() { // The umbrella interval of segments is 2015-02-12/2015-04-14 - CompactionCandidate actual = CompactionCandidate.from(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.YEAR); + ProposedCompaction actual = ProposedCompaction.from( + List.of(dataSegment1, dataSegment2, dataSegment3), + Granularities.YEAR + ); // The compaction interval should be expanded to start of the year and end of the year to cover the segmentGranularity Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), actual.getCompactionInterval()); } @@ -98,7 +101,10 @@ public void testFromSegmentWithCoarserSegmentGranularity() public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalAlign() { // The umbrella interval of segments is 2015-02-12/2015-04-14 - CompactionCandidate actual = CompactionCandidate.from(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.DAY); + ProposedCompaction actual = ProposedCompaction.from( + List.of(dataSegment1, dataSegment2, dataSegment3), + Granularities.DAY + ); // The segmentGranularity of DAY align with the umbrella interval (umbrella interval can be evenly divide into the segmentGranularity) Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getCompactionInterval()); } @@ -107,7 +113,10 @@ public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalAlign() public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalNotAlign() { // The umbrella interval of segments is 2015-02-12/2015-04-14 - CompactionCandidate actual = CompactionCandidate.from(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.WEEK); + ProposedCompaction actual = ProposedCompaction.from( + List.of(dataSegment1, dataSegment2, dataSegment3), + Granularities.WEEK + ); // The segmentGranularity of WEEK does not align with the umbrella interval (umbrella interval cannot be evenly divide into the segmentGranularity) // Hence the compaction interval is modified to aling with the segmentGranularity Assert.assertEquals(Intervals.of("2015-02-09/2015-04-20"), actual.getCompactionInterval()); diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionCandidateTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionCandidateTest.java new file mode 100644 index 000000000000..d17c7ec3a415 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionCandidateTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class CompactionCandidateTest +{ + private static final String DATASOURCE = "test_datasource"; + + @Test + public void testConstructorAndGetters() + { + List segments = createTestSegments(3); + CompactionCandidate.ProposedCompaction proposed = CompactionCandidate.ProposedCompaction.from(segments, null); + CompactionStatus eligibility = CompactionStatus.notEligible("test reason"); + + CompactionCandidate candidate = new CompactionCandidate(proposed, eligibility, null, CompactionMode.FULL_COMPACTION); + + Assert.assertEquals(proposed, candidate.getProposedCompaction()); + Assert.assertEquals(eligibility, candidate.getEligibility()); + Assert.assertEquals(segments, candidate.getSegments()); + Assert.assertEquals(DATASOURCE, candidate.getDataSource()); + Assert.assertEquals(3, candidate.numSegments()); + } + + @Test + public void testProposedCompactionFrom() + { + List segments = createTestSegments(3); + + CompactionCandidate.ProposedCompaction proposed = + CompactionCandidate.ProposedCompaction.from(segments, null); + + Assert.assertEquals(segments, proposed.getSegments()); + Assert.assertEquals(DATASOURCE, proposed.getDataSource()); + Assert.assertEquals(3, proposed.numSegments()); + Assert.assertNotNull(proposed.getUmbrellaInterval()); + Assert.assertNotNull(proposed.getCompactionInterval()); + Assert.assertNotNull(proposed.getStats()); + } + + @Test + public void testProposedCompactionWithTargetGranularity() + { + List segments = createTestSegments(5); + + CompactionCandidate.ProposedCompaction proposed = + CompactionCandidate.ProposedCompaction.from(segments, Granularities.MONTH); + + Assert.assertEquals(segments, proposed.getSegments()); + Assert.assertEquals(5, proposed.numSegments()); + Assert.assertNotNull(proposed.getUmbrellaInterval()); + Assert.assertNotNull(proposed.getCompactionInterval()); + } + + @Test + public void testProposedCompactionThrowsOnNullOrEmptySegments() + { + Assert.assertThrows( + DruidException.class, + () -> CompactionCandidate.ProposedCompaction.from(null, null) + ); + + Assert.assertThrows( + DruidException.class, + () -> CompactionCandidate.ProposedCompaction.from(Collections.emptyList(), null) + ); + } + + @Test + public void testDelegationMethods() + { + List segments = createTestSegments(3); + CompactionCandidate.ProposedCompaction proposed = CompactionCandidate.ProposedCompaction.from(segments, null); + CompactionCandidate candidate = new CompactionCandidate( + proposed, + CompactionStatus.notEligible("test"), + null, + CompactionMode.FULL_COMPACTION + ); + + Assert.assertEquals(proposed.getTotalBytes(), candidate.getTotalBytes()); + Assert.assertEquals(proposed.getUmbrellaInterval(), candidate.getUmbrellaInterval()); + Assert.assertEquals(proposed.getCompactionInterval(), candidate.getCompactionInterval()); + Assert.assertEquals(proposed.getStats().getTotalBytes(), candidate.getStats().getTotalBytes()); + } + + private static List createTestSegments(int count) + { + return CreateDataSegments.ofDatasource(DATASOURCE) + .forIntervals(count, Granularities.DAY) + .startingAt("2024-01-01") + .eachOfSizeInMb(100); + } +} diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java index 56ec8525bb83..c97827e6491a 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java @@ -80,13 +80,13 @@ public void testSimulateClusterCompactionConfigUpdate() Assert.assertNotNull(simulateResult); - final Map compactionStates = simulateResult.getCompactionStates(); + final Map compactionStates = simulateResult.getCompactionStates(); Assert.assertNotNull(compactionStates); - Assert.assertNull(compactionStates.get(CompactionStatus.State.COMPLETE)); - Assert.assertNull(compactionStates.get(CompactionStatus.State.RUNNING)); + Assert.assertNull(compactionStates.get(CompactionCandidate.TaskState.RECENTLY_COMPLETED)); + Assert.assertNull(compactionStates.get(CompactionCandidate.TaskState.TASK_IN_PROGRESS)); - final Table queuedTable = compactionStates.get(CompactionStatus.State.PENDING); + final Table queuedTable = compactionStates.get(CompactionCandidate.TaskState.READY); Assert.assertEquals( Arrays.asList("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact"), queuedTable.getColumnNames() @@ -106,7 +106,7 @@ public void testSimulateClusterCompactionConfigUpdate() queuedTable.getRows() ); - final Table skippedTable = compactionStates.get(CompactionStatus.State.SKIPPED); + final Table skippedTable = simulateResult.getSkippedIntervals(); Assert.assertEquals( Arrays.asList("dataSource", "interval", "numSegments", "bytes", "reasonToSkip"), skippedTable.getColumnNames() @@ -153,13 +153,13 @@ public void testSimulate_withFixedIntervalOrderPolicy() Assert.assertNotNull(simulateResult); - final Map compactionStates = simulateResult.getCompactionStates(); + final Map compactionStates = simulateResult.getCompactionStates(); Assert.assertNotNull(compactionStates); - Assert.assertNull(compactionStates.get(CompactionStatus.State.COMPLETE)); - Assert.assertNull(compactionStates.get(CompactionStatus.State.RUNNING)); + Assert.assertNull(compactionStates.get(CompactionCandidate.TaskState.RECENTLY_COMPLETED)); + Assert.assertNull(compactionStates.get(CompactionCandidate.TaskState.TASK_IN_PROGRESS)); - final Table pendingTable = compactionStates.get(CompactionStatus.State.PENDING); + final Table pendingTable = compactionStates.get(CompactionCandidate.TaskState.READY); Assert.assertEquals( List.of("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact"), pendingTable.getColumnNames() @@ -172,24 +172,23 @@ public void testSimulate_withFixedIntervalOrderPolicy() pendingTable.getRows() ); - final Table skippedTable = compactionStates.get(CompactionStatus.State.SKIPPED); + final Table skippedTable = simulateResult.getSkippedIntervals(); Assert.assertEquals( List.of("dataSource", "interval", "numSegments", "bytes", "reasonToSkip"), skippedTable.getColumnNames() ); - final String rejectedMessage - = "Rejected by search policy: Datasource/Interval is not in the list of 'eligibleCandidates'"; + final String rejectedMessage = "Datasource/Interval is not in the list of 'eligibleCandidates'"; Assert.assertEquals( List.of( - List.of("wiki", Intervals.of("2013-01-02/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), - List.of("wiki", Intervals.of("2013-01-03/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), + List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 1_000_000_000L, 1, "skip offset from latest[P1D]"), + List.of("wiki", Intervals.of("2013-01-09/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), List.of("wiki", Intervals.of("2013-01-07/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), - List.of("wiki", Intervals.of("2013-01-05/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), List.of("wiki", Intervals.of("2013-01-06/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), - List.of("wiki", Intervals.of("2013-01-01/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), - List.of("wiki", Intervals.of("2013-01-09/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), - List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 1_000_000_000L, 1, "skip offset from latest[P1D]") - ), + List.of("wiki", Intervals.of("2013-01-05/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), + List.of("wiki", Intervals.of("2013-01-03/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), + List.of("wiki", Intervals.of("2013-01-02/P1D"), 10, 1_000_000_000L, 1, rejectedMessage), + List.of("wiki", Intervals.of("2013-01-01/P1D"), 10, 1_000_000_000L, 1, rejectedMessage) + ), skippedTable.getRows() ); } diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusBuilderTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusBuilderTest.java new file mode 100644 index 000000000000..d0c381b02706 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusBuilderTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class CompactionStatusBuilderTest +{ + private static final String DATASOURCE = "test_datasource"; + + @Test + public void testNotEligible() + { + CompactionStatus eligibility = CompactionStatus.notEligible("test reason: %s", "failure"); + + Assert.assertEquals(CompactionStatus.State.NOT_ELIGIBLE, eligibility.getState()); + Assert.assertEquals("test reason: failure", eligibility.getReason()); + Assert.assertNull(eligibility.getCompactedStats()); + Assert.assertNull(eligibility.getUncompactedStats()); + } + + @Test + public void testBuilderWithCompactionStats() + { + CompactionStatistics compactedStats = CompactionStatistics.create(1000, 5, 2); + CompactionStatistics uncompactedStats = CompactionStatistics.create(500, 3, 1); + + CompactionStatus eligibility = + CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "needs full compaction") + .compacted(compactedStats) + .uncompacted(uncompactedStats) + .build(); + + Assert.assertEquals(CompactionStatus.State.ELIGIBLE, eligibility.getState()); + Assert.assertEquals("needs full compaction", eligibility.getReason()); + Assert.assertEquals(compactedStats, eligibility.getCompactedStats()); + Assert.assertEquals(uncompactedStats, eligibility.getUncompactedStats()); + } + + @Test + public void testEqualsAndHashCode() + { + // Test with simple eligibility objects (same state and reason) + CompactionStatus simple1 = CompactionStatus.notEligible("reason"); + CompactionStatus simple2 = CompactionStatus.notEligible("reason"); + Assert.assertEquals(simple1, simple2); + Assert.assertEquals(simple1.hashCode(), simple2.hashCode()); + + // Test with different reasons + CompactionStatus differentReason = CompactionStatus.notEligible("different"); + Assert.assertNotEquals(simple1, differentReason); + + // Test with different states + CompactionStatus differentState = CompactionStatus.COMPLETE; + Assert.assertNotEquals(simple1, differentState); + + // Test with full compaction eligibility (with stats) + CompactionStatistics stats1 = CompactionStatistics.create(1000, 5, 2); + CompactionStatistics stats2 = CompactionStatistics.create(500, 3, 1); + + CompactionStatus withStats1 = + CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(stats1) + .uncompacted(stats2) + .build(); + + CompactionStatus withStats2 = + CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(stats1) + .uncompacted(stats2) + .build(); + + // Same values - should be equal + Assert.assertEquals(withStats1, withStats2); + Assert.assertEquals(withStats1.hashCode(), withStats2.hashCode()); + + // Test with different compacted stats + CompactionStatistics differentStats = CompactionStatistics.create(2000, 10, 5); + CompactionStatus differentCompactedStats = + CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(differentStats) + .uncompacted(stats2) + .build(); + Assert.assertNotEquals(withStats1, differentCompactedStats); + + // Test with different uncompacted stats + CompactionStatus differentUncompactedStats = + CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(stats1) + .uncompacted(differentStats) + .build(); + Assert.assertNotEquals(withStats1, differentUncompactedStats); + } + + @Test + public void testBuilderRequiresReasonForNotEligible() + { + Assert.assertThrows( + DruidException.class, + () -> CompactionStatus.builder(CompactionStatus.State.NOT_ELIGIBLE, null).build() + ); + } + + @Test + public void testBuilderRequiresStatsForFullCompaction() + { + // Should throw when neither stat is provided + Assert.assertThrows( + DruidException.class, + () -> CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason").build() + ); + + // Should throw when only compacted stat is provided + Assert.assertThrows( + DruidException.class, + () -> CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(CompactionStatistics.create(1000, 5, 2)) + .build() + ); + + // Should succeed when both stats are provided + CompactionStatus status = CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "reason") + .compacted(CompactionStatistics.create(1000, 5, 2)) + .uncompacted(CompactionStatistics.create(500, 3, 1)) + .build(); + Assert.assertNotNull(status); + } + + private static List createTestSegments(int count) + { + if (count == 0) { + return Collections.emptyList(); + } + + return CreateDataSegments.ofDatasource(DATASOURCE) + .forIntervals(count, Granularities.DAY) + .startingAt("2024-01-01") + .eachOfSizeInMb(100); + } +} diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java index 2b274e805d4c..cb82f64dd6d5 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java @@ -99,9 +99,7 @@ public void testFindPartitionsSpecWhenGivenIsNull() { final ClientCompactionTaskQueryTuningConfig tuningConfig = ClientCompactionTaskQueryTuningConfig.from(null); - Assert.assertNull( - CompactionStatus.findPartitionsSpecFromConfig(tuningConfig) - ); + Assert.assertNull(CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)); } @Test @@ -173,9 +171,7 @@ public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMa .build(); Assert.assertEquals( new DynamicPartitionsSpec(100, 1000L), - CompactionStatus.findPartitionsSpecFromConfig( - ClientCompactionTaskQueryTuningConfig.from(config) - ) + CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(config)) ); } @@ -221,7 +217,7 @@ public void testFindPartitionsSpecWhenGivenIsRangeWithTargetRows() @Test public void testStatusWhenLastCompactionStateIsNull() { - verifyCompactionStatusIsPendingBecause( + verifyCompactionIsEligibleBecause( null, InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), "not compacted yet" @@ -232,7 +228,7 @@ public void testStatusWhenLastCompactionStateIsNull() public void testStatusWhenLastCompactionStateIsEmpty() { final PartitionsSpec requiredPartitionsSpec = new DynamicPartitionsSpec(5_000_000, null); - verifyCompactionStatusIsPendingBecause( + verifyCompactionIsEligibleBecause( new CompactionState(null, null, null, null, null, null, null), InlineSchemaDataSourceCompactionConfig .builder() @@ -257,7 +253,7 @@ public void testStatusOnPartitionsSpecMismatch() .forDataSource(TestDataSource.WIKI) .build(); - verifyCompactionStatusIsPendingBecause( + verifyCompactionIsEligibleBecause( lastCompactionState, compactionConfig, "'partitionsSpec' mismatch: required['dynamic' with 5,000,000 rows]," @@ -287,7 +283,7 @@ public void testStatusOnIndexSpecMismatch() .withTuningConfig(createTuningConfig(currentPartitionsSpec, null)) .build(); - verifyCompactionStatusIsPendingBecause( + verifyCompactionIsEligibleBecause( lastCompactionState, compactionConfig, "'indexSpec' mismatch: " @@ -329,7 +325,7 @@ public void testStatusOnSegmentGranularityMismatch() .withGranularitySpec(new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null)) .build(); - verifyCompactionStatusIsPendingBecause( + verifyCompactionIsEligibleBecause( lastCompactionState, compactionConfig, "'segmentGranularity' mismatch: required[DAY], current[HOUR]" @@ -362,11 +358,11 @@ public void testStatusWhenLastCompactionStateSameAsRequired() final DataSegment segment = DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), Granularities.HOUR), + CompactionCandidate.ProposedCompaction.from(List.of(segment), Granularities.HOUR), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.State.COMPLETE, status.getState()); } @Test @@ -412,11 +408,11 @@ public void testStatusWhenProjectionsMatch() final DataSegment segment = DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), Granularities.HOUR), + CompactionCandidate.ProposedCompaction.from(List.of(segment), Granularities.HOUR), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.COMPLETE, status); } @Test @@ -467,11 +463,12 @@ public void testStatusWhenProjectionsMismatch() final DataSegment segment = DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), Granularities.HOUR), + CompactionCandidate.ProposedCompaction.from(List.of(segment), Granularities.HOUR), compactionConfig, fingerprintMapper ); - Assert.assertFalse(status.isComplete()); + Assert.assertEquals(CompactionStatus.State.ELIGIBLE, status.getState()); + Assert.assertTrue(status.getReason().contains("'projections' mismatch")); } @Test @@ -521,11 +518,11 @@ public void testStatusWhenAutoSchemaMatch() final DataSegment segment = DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), null), + CompactionCandidate.ProposedCompaction.from(List.of(segment), null), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.COMPLETE, status); } @Test @@ -575,11 +572,12 @@ public void testStatusWhenAutoSchemaMismatch() final DataSegment segment = DataSegment.builder(WIKI_SEGMENT).lastCompactionState(lastCompactionState).build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), null), + CompactionCandidate.ProposedCompaction.from(List.of(segment), null), compactionConfig, fingerprintMapper ); - Assert.assertFalse(status.isComplete()); + Assert.assertEquals(CompactionStatus.State.ELIGIBLE, status.getState()); + Assert.assertTrue(status.getReason().contains("'dimensionsSpec' mismatch")); } @Test @@ -607,7 +605,7 @@ public void test_evaluate_needsCompactionWhenAllSegmentsHaveUnexpectedIndexingSt syncCacheFromManager(); verifyEvaluationNeedsCompactionBecauseWithCustomSegments( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, "'segmentGranularity' mismatch: required[DAY], current[HOUR]" ); @@ -643,7 +641,7 @@ public void test_evaluate_needsCompactionWhenSomeSegmentsHaveUnexpectedIndexingS syncCacheFromManager(); verifyEvaluationNeedsCompactionBecauseWithCustomSegments( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, "'segmentGranularity' mismatch: required[DAY], current[HOUR]" ); @@ -666,11 +664,11 @@ public void test_evaluate_noCompacationIfUnexpectedFingerprintHasExpectedIndexin syncCacheFromManager(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.COMPLETE, status); } @Test @@ -686,7 +684,7 @@ public void test_evaluate_needsCompactionWhenUnexpectedFingerprintAndNoFingerpri .build(); verifyEvaluationNeedsCompactionBecauseWithCustomSegments( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, "One or more fingerprinted segments do not have a cached indexing state" ); @@ -711,11 +709,11 @@ public void test_evaluate_noCompactionWhenAllSegmentsHaveExpectedIndexingStateFi ); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.COMPLETE, status); } @Test @@ -740,7 +738,7 @@ public void test_evaluate_needsCompactionWhenNonFingerprintedSegmentsFailChecksO verifyEvaluationNeedsCompactionBecauseWithCustomSegments( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, "'segmentGranularity' mismatch: required[DAY], current[HOUR]" ); @@ -765,11 +763,11 @@ public void test_evaluate_noCompactionWhenNonFingerprintedSegmentsPassChecksOnLa ); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, fingerprintMapper ); - Assert.assertTrue(status.isComplete()); + Assert.assertEquals(CompactionStatus.COMPLETE, status); } // ============================ @@ -795,13 +793,12 @@ public void test_evaluate_isSkippedWhenInputBytesExceedLimit() ); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(segments, null), + CompactionCandidate.ProposedCompaction.from(segments, null), compactionConfig, fingerprintMapper ); - Assert.assertFalse(status.isComplete()); - Assert.assertTrue(status.isSkipped()); + Assert.assertEquals(CompactionStatus.State.NOT_ELIGIBLE, status.getState()); Assert.assertTrue(status.getReason().contains("'inputSegmentSize' exceeded")); Assert.assertTrue(status.getReason().contains("200000000")); Assert.assertTrue(status.getReason().contains("150000000")); @@ -812,22 +809,22 @@ public void test_evaluate_isSkippedWhenInputBytesExceedLimit() * Allows customization of the segments in the compaction candidate. */ private void verifyEvaluationNeedsCompactionBecauseWithCustomSegments( - CompactionCandidate candidate, + CompactionCandidate.ProposedCompaction proposedCompaction, DataSourceCompactionConfig compactionConfig, String expectedReason ) { final CompactionStatus status = CompactionStatus.compute( - candidate, + proposedCompaction, compactionConfig, fingerprintMapper ); - Assert.assertFalse(status.isComplete()); + Assert.assertEquals(CompactionStatus.State.ELIGIBLE, status.getState()); Assert.assertEquals(expectedReason, status.getReason()); } - private void verifyCompactionStatusIsPendingBecause( + private void verifyCompactionIsEligibleBecause( CompactionState lastCompactionState, DataSourceCompactionConfig compactionConfig, String expectedReason @@ -838,12 +835,12 @@ private void verifyCompactionStatusIsPendingBecause( .lastCompactionState(lastCompactionState) .build(); final CompactionStatus status = CompactionStatus.compute( - CompactionCandidate.from(List.of(segment), null), + CompactionCandidate.ProposedCompaction.from(List.of(segment), null), compactionConfig, fingerprintMapper ); - Assert.assertFalse(status.isComplete()); + Assert.assertEquals(CompactionStatus.State.ELIGIBLE, status.getState()); Assert.assertEquals(expectedReason, status.getReason()); } diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java index 1314a1a0bc79..c5dc7644781e 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java @@ -22,6 +22,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.segment.TestDataSource; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.timeline.DataSegment; @@ -29,6 +30,7 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.util.List; public class CompactionStatusTrackerTest @@ -47,8 +49,7 @@ public void setup() @Test public void testGetLatestTaskStatusForSubmittedTask() { - final CompactionCandidate candidateSegments - = CompactionCandidate.from(List.of(WIKI_SEGMENT), null); + final CompactionCandidate candidateSegments = createCandidate(List.of(WIKI_SEGMENT), null); statusTracker.onTaskSubmitted("task1", candidateSegments); CompactionTaskStatus status = statusTracker.getLatestTaskStatus(candidateSegments); @@ -58,8 +59,7 @@ public void testGetLatestTaskStatusForSubmittedTask() @Test public void testGetLatestTaskStatusForSuccessfulTask() { - final CompactionCandidate candidateSegments - = CompactionCandidate.from(List.of(WIKI_SEGMENT), null); + final CompactionCandidate candidateSegments = createCandidate(List.of(WIKI_SEGMENT), null); statusTracker.onTaskSubmitted("task1", candidateSegments); statusTracker.onTaskFinished("task1", TaskStatus.success("task1")); @@ -70,8 +70,7 @@ public void testGetLatestTaskStatusForSuccessfulTask() @Test public void testGetLatestTaskStatusForFailedTask() { - final CompactionCandidate candidateSegments - = CompactionCandidate.from(List.of(WIKI_SEGMENT), null); + final CompactionCandidate candidateSegments = createCandidate(List.of(WIKI_SEGMENT), null); statusTracker.onTaskSubmitted("task1", candidateSegments); statusTracker.onTaskFinished("task1", TaskStatus.failure("task1", "some failure")); @@ -83,8 +82,7 @@ public void testGetLatestTaskStatusForFailedTask() @Test public void testGetLatestTaskStatusForRepeatedlyFailingTask() { - final CompactionCandidate candidateSegments - = CompactionCandidate.from(List.of(WIKI_SEGMENT), null); + final CompactionCandidate candidateSegments = createCandidate(List.of(WIKI_SEGMENT), null); statusTracker.onTaskSubmitted("task1", candidateSegments); statusTracker.onTaskFinished("task1", TaskStatus.failure("task1", "some failure")); @@ -102,33 +100,42 @@ public void testGetLatestTaskStatusForRepeatedlyFailingTask() } @Test - public void testComputeCompactionStatusForSuccessfulTask() + public void testComputeCompactionTaskStateForSuccessfulTask() { final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null); - final CompactionCandidate candidateSegments - = CompactionCandidate.from(List.of(WIKI_SEGMENT), null); + final CompactionCandidate candidateSegments = createCandidate(List.of(WIKI_SEGMENT), null); // Verify that interval is originally eligible for compaction - CompactionStatus status - = statusTracker.computeCompactionStatus(candidateSegments, policy); - Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); - Assert.assertEquals("Not compacted yet", status.getReason()); + CompactionCandidate.TaskState status = statusTracker.computeCompactionTaskState(candidateSegments); + Assert.assertEquals(CompactionCandidate.TaskState.READY, status); // Verify that interval is skipped for compaction after task has finished statusTracker.onSegmentTimelineUpdated(DateTimes.nowUtc().minusMinutes(1)); statusTracker.onTaskSubmitted("task1", candidateSegments); statusTracker.onTaskFinished("task1", TaskStatus.success("task1")); - status = statusTracker.computeCompactionStatus(candidateSegments, policy); - Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState()); - Assert.assertEquals( - "Segment timeline not updated since last compaction task succeeded", - status.getReason() - ); + status = statusTracker.computeCompactionTaskState(candidateSegments); + Assert.assertEquals(CompactionCandidate.TaskState.RECENTLY_COMPLETED, status); // Verify that interval becomes eligible again after timeline has been updated statusTracker.onSegmentTimelineUpdated(DateTimes.nowUtc()); - status = statusTracker.computeCompactionStatus(candidateSegments, policy); - Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); + status = statusTracker.computeCompactionTaskState(candidateSegments); + Assert.assertEquals(CompactionCandidate.TaskState.READY, status); + } + + private static CompactionCandidate createCandidate( + List segments, + @Nullable Granularity targetSegmentGranularity + ) + { + CompactionCandidate.ProposedCompaction proposedCompaction = CompactionCandidate.ProposedCompaction.from( + segments, + targetSegmentGranularity + ); + CompactionStatus status = CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "approve without check") + .compacted(CompactionStatistics.create(1, 1, 1)) + .uncompacted(CompactionStatistics.create(1, 1, 1)) + .build(); + return CompactionMode.FULL_COMPACTION.createCandidate(proposedCompaction, status); } } diff --git a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java index 1b93bfa03a55..dc88c61d5aaf 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java @@ -22,8 +22,8 @@ import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.HumanReadableBytes; -import org.apache.druid.segment.TestDataSource; -import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.TestSegmentUtils; import org.apache.druid.timeline.DataSegment; import org.junit.Test; import org.junit.jupiter.api.Assertions; @@ -34,12 +34,19 @@ public class MostFragmentedIntervalFirstPolicyTest { private static final DataSegment SEGMENT = - CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100).get(0); + TestSegmentUtils.makeSegment("foo", "1", Intervals.ETERNITY); + private static final DataSegment SEGMENT2 = + TestSegmentUtils.makeSegment("foo", "2", Intervals.ETERNITY); + private static final CompactionCandidate.ProposedCompaction PROPOSED_COMPACTION = + CompactionCandidate.ProposedCompaction.from(List.of(SEGMENT, SEGMENT2), null); + + private static final CompactionStatistics DUMMY_COMPACTION_STATS = CompactionStatistics.create(1L, 1L, 1L); @Test public void test_thresholdValues_ofDefaultPolicy() { - final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy(null, null, null, null); + final MostFragmentedIntervalFirstPolicy policy = + new MostFragmentedIntervalFirstPolicy(null, null, null, null); Assertions.assertEquals(100, policy.getMinUncompactedCount()); Assertions.assertEquals(new HumanReadableBytes("10MiB"), policy.getMinUncompactedBytes()); Assertions.assertEquals(new HumanReadableBytes("2GiB"), policy.getMaxAverageUncompactedBytesPerSegment()); @@ -47,7 +54,7 @@ public void test_thresholdValues_ofDefaultPolicy() } @Test - public void test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanCutoff() + public void test_createCandidate_fails_ifUncompactedCountLessThanCutoff() { final int minUncompactedCount = 10_000; final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( @@ -57,20 +64,23 @@ public void test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanC null ); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 100L)).build(); + final CompactionCandidate candidate1 = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( - "Uncompacted segments[1] in interval must be at least [10,000]" - ), - policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) - ); - Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, - policy.checkEligibilityForCompaction(createCandidate(10_001, 100L), null) + "Uncompacted segments[1] in interval must be at least [10,000]", + candidate1.getPolicyNote() ); + Assertions.assertEquals(CompactionMode.NOT_APPLICABLE, candidate1.getMode()); + + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(10_001, 100L)).build(); + final CompactionCandidate candidate2 = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); + Assertions.assertEquals(CompactionMode.FULL_COMPACTION, candidate2.getMode()); } @Test - public void test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanCutoff() + public void test_createCandidate_fails_ifUncompactedBytesLessThanCutoff() { final HumanReadableBytes minUncompactedBytes = HumanReadableBytes.valueOf(10_000); final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( @@ -80,20 +90,20 @@ public void test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanC null ); - Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( - "Uncompacted bytes[100] in interval must be at least [10,000]" - ), - policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) - ); - Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, - policy.checkEligibilityForCompaction(createCandidate(100, 10_000L), null) - ); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 100L)).build(); + final CompactionCandidate candidate1 = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); + Assertions.assertEquals("Uncompacted bytes[100] in interval must be at least [10,000]", candidate1.getPolicyNote()); + Assertions.assertEquals(CompactionMode.NOT_APPLICABLE, candidate1.getMode()); + + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(100, 10_000L)).build(); + final CompactionCandidate candidate2 = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); + Assertions.assertEquals(CompactionMode.FULL_COMPACTION, candidate2.getMode()); } @Test - public void test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThanCutoff() + public void test_createCandidate_fails_ifAvgSegmentSizeGreaterThanCutoff() { final HumanReadableBytes maxAvgSegmentSize = HumanReadableBytes.valueOf(100); final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( @@ -103,16 +113,18 @@ public void test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThan null ); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 10_000L)).build(); + final CompactionCandidate candidate1 = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( - "Average size[10,000] of uncompacted segments in interval must be at most [100]" - ), - policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null) - ); - Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, - policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + "Average size[10,000] of uncompacted segments in interval must be at most [100]", + candidate1.getPolicyNote() ); + Assertions.assertEquals(CompactionMode.NOT_APPLICABLE, candidate1.getMode()); + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 100L)).build(); + final CompactionCandidate candidate2 = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); + Assertions.assertEquals(CompactionMode.FULL_COMPACTION, candidate2.getMode()); } @Test @@ -125,12 +137,13 @@ public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifTotalBytesIs null ); - final CompactionCandidate candidateA = createCandidate(1, 1000L); - final CompactionCandidate candidateB = createCandidate(2, 500L); - - verifyCandidateIsEligible(candidateA, policy); - verifyCandidateIsEligible(candidateB, policy); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 1_000L)).build(); + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(2, 500L)).build(); + final CompactionCandidate candidateA = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); + final CompactionCandidate candidateB = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); } @@ -145,12 +158,13 @@ public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifAverageSizeI null ); - final CompactionCandidate candidateA = createCandidate(1, 1000L); - final CompactionCandidate candidateB = createCandidate(2, 1000L); - - verifyCandidateIsEligible(candidateA, policy); - verifyCandidateIsEligible(candidateB, policy); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(1, 1000L)).build(); + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(2, 1000L)).build(); + final CompactionCandidate candidateA = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); + final CompactionCandidate candidateB = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); } @@ -165,12 +179,13 @@ public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual() null ); - final CompactionCandidate candidateA = createCandidate(10, 500L); - final CompactionCandidate candidateB = createCandidate(10, 1000L); - - verifyCandidateIsEligible(candidateA, policy); - verifyCandidateIsEligible(candidateB, policy); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(10, 500L)).build(); + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(10, 1000L)).build(); + final CompactionCandidate candidateA = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); + final CompactionCandidate candidateB = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) < 0); Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) > 0); } @@ -185,12 +200,13 @@ public void test_compareCandidates_returnsZeroIfSegmentCountAndAvgSizeScaleEquiv null ); - final CompactionCandidate candidateA = createCandidate(100, 25); - final CompactionCandidate candidateB = createCandidate(400, 100); - - verifyCandidateIsEligible(candidateA, policy); - verifyCandidateIsEligible(candidateB, policy); + final CompactionStatus eligibility1 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(100, 25)).build(); + final CompactionStatus eligibility2 = + eligibilityBuilder().compacted(DUMMY_COMPACTION_STATS).uncompacted(createStats(400, 100)).build(); + final CompactionCandidate candidateA = policy.createCandidate(PROPOSED_COMPACTION, eligibility1); + final CompactionCandidate candidateB = policy.createCandidate(PROPOSED_COMPACTION, eligibility2); Assertions.assertEquals(0, policy.compareCandidates(candidateA, candidateB)); Assertions.assertEquals(0, policy.compareCandidates(candidateB, candidateA)); } @@ -222,30 +238,21 @@ public void test_serde_allFieldsSet() throws IOException @Test public void test_serde_noFieldsSet() throws IOException { - final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy(null, null, null, null); + final MostFragmentedIntervalFirstPolicy policy = + new MostFragmentedIntervalFirstPolicy(null, null, null, null); final DefaultObjectMapper mapper = new DefaultObjectMapper(); final CompactionCandidateSearchPolicy policy2 = mapper.readValue(mapper.writeValueAsString(policy), CompactionCandidateSearchPolicy.class); Assertions.assertEquals(policy, policy2); } - private CompactionCandidate createCandidate(int numSegments, long avgSizeBytes) + private CompactionStatistics createStats(int numSegments, long avgSizeBytes) { - final CompactionStatistics dummyCompactedStats = CompactionStatistics.create(1L, 1L, 1L); - final CompactionStatistics uncompactedStats = CompactionStatistics.create( - avgSizeBytes * numSegments, - numSegments, - 1L - ); - return CompactionCandidate.from(List.of(SEGMENT), null) - .withCurrentStatus(CompactionStatus.pending(dummyCompactedStats, uncompactedStats, "")); + return CompactionStatistics.create(avgSizeBytes * numSegments, numSegments, 1L); } - private void verifyCandidateIsEligible(CompactionCandidate candidate, MostFragmentedIntervalFirstPolicy policy) + private static CompactionStatus.CompactionStatusBuilder eligibilityBuilder() { - Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, - policy.checkEligibilityForCompaction(candidate, null) - ); + return CompactionStatus.builder(CompactionStatus.State.ELIGIBLE, "approve"); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 68cef0430d2f..122f7996c327 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -80,7 +80,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.BatchIOConfig; import org.apache.druid.segment.transform.CompactionTransformSpec; -import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionCandidate.ProposedCompaction; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.CompactionSlotManager; import org.apache.druid.server.compaction.CompactionStatusTracker; @@ -101,6 +101,7 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -217,7 +218,10 @@ public void setup() final String dataSource = DATA_SOURCE_PREFIX + i; for (int j : new int[]{0, 1, 2, 3, 7, 8}) { for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) { - List segmentForDatasource = datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>()); + List segmentForDatasource = datasourceToSegments.computeIfAbsent( + dataSource, + key -> new ArrayList<>() + ); DataSegment dataSegment = createSegment(dataSource, j, true, k); allSegments.add(dataSegment); segmentForDatasource.add(dataSegment); @@ -250,17 +254,10 @@ private DataSegment createSegment(String dataSource, int startDay, boolean befor startDay + 2 ) ); - return new DataSegment( - dataSource, - interval, - "version", - null, - Collections.emptyList(), - Collections.emptyList(), - shardSpec, - 0, - 10L - ); + return DataSegment.builder(SegmentId.of(dataSource, interval, "version", partition)) + .shardSpec(shardSpec) + .size(10L) + .build(); } @Test @@ -865,8 +862,12 @@ public void testCompactWithGranularitySpec() .withTuningConfig(getTuningConfig(3)) .withEngine(engine) .withGranularitySpec( - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null) - ) + new UserCompactionTaskGranularityConfig( + Granularities.YEAR, + null, + null + ) + ) .build() ); doCompactSegments(compactSegments, compactionConfigs); @@ -876,7 +877,7 @@ public void testCompactWithGranularitySpec() // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // are within the same year Assert.assertEquals( - CompactionCandidate.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), + ProposedCompaction.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), taskPayload.getIoConfig().getInputSpec().getInterval() ); @@ -901,10 +902,10 @@ public void testCompactWithDimensionSpec() .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withDimensionsSpec( - new UserCompactionTaskDimensionsConfig( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")) - ) - ) + new UserCompactionTaskDimensionsConfig( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")) + ) + ) .withEngine(engine) .build() ); @@ -968,10 +969,10 @@ public void testCompactWithProjections() .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withDimensionsSpec( - new UserCompactionTaskDimensionsConfig( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")) - ) - ) + new UserCompactionTaskDimensionsConfig( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")) + ) + ) .withProjections(projections) .withEngine(engine) .build() @@ -1056,8 +1057,12 @@ public void testCompactWithRollupInGranularitySpec() .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withGranularitySpec( - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true) - ) + new UserCompactionTaskGranularityConfig( + Granularities.YEAR, + null, + true + ) + ) .withEngine(engine) .build() ); @@ -1067,7 +1072,7 @@ public void testCompactWithRollupInGranularitySpec() // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // are within the same year Assert.assertEquals( - CompactionCandidate.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), + ProposedCompaction.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), taskPayload.getIoConfig().getInputSpec().getInterval() ); @@ -1146,8 +1151,12 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withGranularitySpec( - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null) - ) + new UserCompactionTaskGranularityConfig( + Granularities.YEAR, + null, + null + ) + ) .withEngine(engine) .build() ); @@ -1162,7 +1171,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // are within the same year Assert.assertEquals( - CompactionCandidate.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), + ProposedCompaction.from(datasourceToSegments.get(dataSource), Granularities.YEAR).getCompactionInterval(), taskPayload.getIoConfig().getInputSpec().getInterval() ); @@ -1288,10 +1297,10 @@ public void testCompactWithTransformSpec() .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withTransformSpec( - new CompactionTransformSpec( - new SelectorDimFilter("dim1", "foo", null) - ) - ) + new CompactionTransformSpec( + new SelectorDimFilter("dim1", "foo", null) + ) + ) .withEngine(engine) .build() ); @@ -1328,7 +1337,7 @@ public void testCompactWithoutCustomSpecs() @Test public void testCompactWithMetricsSpec() { - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("cnt")}; final OverlordClient mockClient = Mockito.mock(OverlordClient.class); final ArgumentCaptor payloadCaptor = setUpMockClient(mockClient); final CompactSegments compactSegments = new CompactSegments(statusTracker, mockClient); @@ -1358,30 +1367,26 @@ public void testDetermineSegmentGranularityFromSegmentsToCompact() String dataSourceName = DATA_SOURCE_PREFIX + 1; List segments = new ArrayList<>(); segments.add( - new DataSegment( - dataSourceName, - Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), - "1", - null, - ImmutableList.of(), - ImmutableList.of(), - shardSpecFactory.apply(0, 2), - 0, - 10L - ) + DataSegment.builder(SegmentId.of( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + 0 + )) + .shardSpec(shardSpecFactory.apply(0, 2)) + .size(10L) + .build() ); segments.add( - new DataSegment( - dataSourceName, - Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), - "1", - null, - ImmutableList.of(), - ImmutableList.of(), - shardSpecFactory.apply(1, 2), - 0, - 10L - ) + DataSegment.builder(SegmentId.of( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + 1 + )) + .shardSpec(shardSpecFactory.apply(1, 2)) + .size(10L) + .build() ); dataSources = DataSourcesSnapshot.fromUsedSegments(segments); @@ -1403,7 +1408,7 @@ public void testDetermineSegmentGranularityFromSegmentsToCompact() ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue(); Assert.assertEquals( - CompactionCandidate.from(segments, Granularities.DAY).getCompactionInterval(), + ProposedCompaction.from(segments, Granularities.DAY).getCompactionInterval(), taskPayload.getIoConfig().getInputSpec().getInterval() ); @@ -1418,30 +1423,26 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon String dataSourceName = DATA_SOURCE_PREFIX + 1; List segments = new ArrayList<>(); segments.add( - new DataSegment( - dataSourceName, - Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), - "1", - null, - ImmutableList.of(), - ImmutableList.of(), - shardSpecFactory.apply(0, 2), - 0, - 10L - ) + DataSegment.builder(SegmentId.of( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + 0 + )) + .shardSpec(shardSpecFactory.apply(0, 2)) + .size(10L) + .build() ); segments.add( - new DataSegment( - dataSourceName, - Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), - "1", - null, - ImmutableList.of(), - ImmutableList.of(), - shardSpecFactory.apply(1, 2), - 0, - 10L - ) + DataSegment.builder(SegmentId.of( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + 1 + )) + .shardSpec(shardSpecFactory.apply(1, 2)) + .size(10L) + .build() ); dataSources = DataSourcesSnapshot.fromUsedSegments(segments); @@ -1457,8 +1458,12 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) .withGranularitySpec( - new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null) - ) + new UserCompactionTaskGranularityConfig( + Granularities.YEAR, + null, + null + ) + ) .withEngine(engine) .build() ); @@ -1466,7 +1471,7 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon ClientCompactionTaskQuery taskPayload = (ClientCompactionTaskQuery) payloadCaptor.getValue(); Assert.assertEquals( - CompactionCandidate.from(segments, Granularities.YEAR).getCompactionInterval(), + ProposedCompaction.from(segments, Granularities.YEAR).getCompactionInterval(), taskPayload.getIoConfig().getInputSpec().getInterval() ); @@ -1490,7 +1495,7 @@ public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue() .withInputSegmentSizeBytes(500L) .withSkipOffsetFromLatest(new Period("PT0H")) // smaller than segment interval .withTuningConfig(getTuningConfig(3)) - .withMetricsSpec(new AggregatorFactory[] {new CountAggregatorFactory("cnt")}) + .withMetricsSpec(new AggregatorFactory[]{new CountAggregatorFactory("cnt")}) .withEngine(engine) .build() ); @@ -1545,17 +1550,20 @@ private void verifySnapshot( { Map autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot(); AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName); - Assert.assertEquals(dataSourceName, snapshot.getDataSource()); - Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus()); - Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction()); - Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted()); - Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped()); - Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction()); - Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted()); - Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped()); - Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction()); - Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted()); - Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped()); + Assert.assertEquals(new AutoCompactionSnapshot( + dataSourceName, + scheduleStatus, + null, + expectedByteCountAwaitingCompaction, + expectedByteCountCompressed, + expectedByteCountSkipped, + expectedSegmentCountAwaitingCompaction, + expectedSegmentCountCompressed, + expectedSegmentCountSkipped, + expectedIntervalCountAwaitingCompaction, + expectedIntervalCountCompressed, + expectedIntervalCountSkipped + ), snapshot); } private void doCompactionAndAssertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount) @@ -1632,7 +1640,10 @@ private CoordinatorRunStats doCompactSegments(CompactSegments compactSegments) return doCompactSegments(compactSegments, (Integer) null); } - private CoordinatorRunStats doCompactSegments(CompactSegments compactSegments, @Nullable Integer numCompactionTaskSlots) + private CoordinatorRunStats doCompactSegments( + CompactSegments compactSegments, + @Nullable Integer numCompactionTaskSlots + ) { return doCompactSegments(compactSegments, createCompactionConfigs(), numCompactionTaskSlots); } @@ -1725,7 +1736,8 @@ private void assertCompactSegments( = dataSources.getUsedSegmentsTimelinesPerDataSource(); for (int i = 0; i < 3; i++) { final String dataSource = DATA_SOURCE_PREFIX + i; - List> holders = dataSourceToTimeline.get(dataSource).lookup(expectedInterval); + List> holders = + dataSourceToTimeline.get(dataSource).lookup(expectedInterval); Assert.assertEquals(1, holders.size()); List> chunks = Lists.newArrayList(holders.get(0).getObject()); Assert.assertEquals(2, chunks.size()); @@ -1821,10 +1833,10 @@ private List createCompactionConfigs( .withTuningConfig(getTuningConfig(maxNumConcurrentSubTasksForNative)) .withEngine(engine) .withTaskContext( - maxNumTasksForMSQ == null - ? null - : ImmutableMap.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasksForMSQ) - ) + maxNumTasksForMSQ == null + ? null + : Map.of(ClientMSQContext.CTX_MAX_NUM_TASKS, maxNumTasksForMSQ) + ) .build() ); } @@ -1925,7 +1937,8 @@ private void compactSegments( if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) { compactionPartitionsSpec = new DynamicPartitionsSpec( clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(), - ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE) + ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr( + Long.MAX_VALUE) ); } else { compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec(); @@ -1937,40 +1950,39 @@ private void compactSegments( } for (int i = 0; i < 2; i++) { - DataSegment compactSegment = new DataSegment( - segments.get(0).getDataSource(), - compactInterval, - version, - null, - segments.get(0).getDimensions(), - segments.get(0).getMetrics(), - shardSpecFactory.apply(i, 2), - new CompactionState( - compactionPartitionsSpec, - clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec( - clientCompactionTaskQuery.getDimensionsSpec().getDimensions() - ), - metricsSpec, - clientCompactionTaskQuery.getTransformSpec(), - jsonMapper.convertValue( - ImmutableMap.of( - "bitmap", - ImmutableMap.of("type", "roaring"), - "dimensionCompression", - "lz4", - "metricCompression", - "lz4", - "longEncoding", - "longs" - ), - IndexSpec.class - ), - jsonMapper.convertValue(ImmutableMap.of(), GranularitySpec.class), - null - ), - 1, - segmentSize - ); + DataSegment compactSegment = + DataSegment.builder(SegmentId.of(segments.get(0).getDataSource(), compactInterval, version, i)) + .dimensions(segments.get(0).getDimensions()) + .metrics(segments.get(0).getMetrics()) + .shardSpec(shardSpecFactory.apply(i, 2)) + .lastCompactionState( + new CompactionState( + compactionPartitionsSpec, + clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec( + clientCompactionTaskQuery.getDimensionsSpec().getDimensions() + ), + metricsSpec, + clientCompactionTaskQuery.getTransformSpec(), + jsonMapper.convertValue( + ImmutableMap.of( + "bitmap", + ImmutableMap.of("type", "roaring"), + "dimensionCompression", + "lz4", + "metricCompression", + "lz4", + "longEncoding", + "longs" + ), + IndexSpec.class + ), + jsonMapper.convertValue(ImmutableMap.of(), GranularitySpec.class), + null + ) + ) + .binaryVersion(1) + .size(segmentSize) + .build(); timeline.add( compactInterval,