Skip to content

Commit 42d7327

Browse files
authored
Differentiate retry and halt error and retry failed compaction only on retriable error (cortexproject#6111)
* Differentiate retry and halt error. Retry failed compaction only on retriable error Signed-off-by: Alex Le <[email protected]> * update CHANGELOG Signed-off-by: Alex Le <[email protected]> * fix lint and address comment Signed-off-by: Alex Le <[email protected]> * merged halt and retriable error into one and differentiate with type label Signed-off-by: Alex Le <[email protected]> * reverted log change Signed-off-by: Alex Le <[email protected]> * rename Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent fe788be commit 42d7327

6 files changed

+161
-14
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095
3030
* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096
3131
* [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097
32+
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
3233
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
3334
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
3435
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920

pkg/compactor/blocks_cleaner_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8585
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
8686
Name: blocksMarkedForDeletionName,
8787
Help: blocksMarkedForDeletionHelp,
88-
}, append(commonLabels, ReasonLabelName))
88+
}, append(commonLabels, reasonLabelName))
8989

9090
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
9191

@@ -192,7 +192,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
192192
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
193193
Name: blocksMarkedForDeletionName,
194194
Help: blocksMarkedForDeletionHelp,
195-
}, append(commonLabels, ReasonLabelName))
195+
}, append(commonLabels, reasonLabelName))
196196

197197
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
198198
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
@@ -353,7 +353,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
353353
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
354354
Name: blocksMarkedForDeletionName,
355355
Help: blocksMarkedForDeletionHelp,
356-
}, append(commonLabels, ReasonLabelName))
356+
}, append(commonLabels, reasonLabelName))
357357

358358
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
359359
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
@@ -417,7 +417,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
417417
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
418418
Name: blocksMarkedForDeletionName,
419419
Help: blocksMarkedForDeletionHelp,
420-
}, append(commonLabels, ReasonLabelName))
420+
}, append(commonLabels, reasonLabelName))
421421

422422
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
423423
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
@@ -475,7 +475,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
475475
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
476476
Name: blocksMarkedForDeletionName,
477477
Help: blocksMarkedForDeletionHelp,
478-
}, append(commonLabels, ReasonLabelName))
478+
}, append(commonLabels, reasonLabelName))
479479

480480
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
481481
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
@@ -616,7 +616,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
616616
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
617617
Name: blocksMarkedForDeletionName,
618618
Help: blocksMarkedForDeletionHelp,
619-
}, append(commonLabels, ReasonLabelName))
619+
}, append(commonLabels, reasonLabelName))
620620

621621
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
622622

pkg/compactor/compactor.go

+10
Original file line numberDiff line numberDiff line change
@@ -792,10 +792,20 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
792792
if lastErr == nil {
793793
return nil
794794
}
795+
if ctx.Err() != nil {
796+
return ctx.Err()
797+
}
795798
if c.isCausedByPermissionDenied(lastErr) {
796799
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
800+
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, unauthorizedError).Inc()
797801
return nil
798802
}
803+
if compact.IsHaltError(lastErr) {
804+
level.Error(c.logger).Log("msg", "compactor returned critical error", "user", userID, "err", lastErr)
805+
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, haltError).Inc()
806+
return lastErr
807+
}
808+
c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, retriableError).Inc()
799809

800810
retries.Wait()
801811
}

pkg/compactor/compactor_metrics.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,23 @@ type compactorMetrics struct {
3737
compactionFailures *prometheus.CounterVec
3838
verticalCompactions *prometheus.CounterVec
3939
remainingPlannedCompactions *prometheus.GaugeVec
40+
compactionErrorsCount *prometheus.CounterVec
4041
}
4142

4243
const (
43-
UserLabelName = "user"
44-
TimeRangeLabelName = "time_range_milliseconds"
45-
ReasonLabelName = "reason"
44+
userLabelName = "user"
45+
timeRangeLabelName = "time_range_milliseconds"
46+
reasonLabelName = "reason"
47+
compactionErrorTypesLabelName = "type"
48+
49+
retriableError = "retriable"
50+
haltError = "halt"
51+
unauthorizedError = "unauthorized"
4652
)
4753

4854
var (
49-
commonLabels = []string{UserLabelName}
50-
compactionLabels = []string{TimeRangeLabelName}
55+
commonLabels = []string{userLabelName}
56+
compactionLabels = []string{timeRangeLabelName}
5157
)
5258

5359
func newDefaultCompactorMetrics(reg prometheus.Registerer) *compactorMetrics {
@@ -129,7 +135,7 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
129135
m.syncerBlocksMarkedForDeletion = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
130136
Name: blocksMarkedForDeletionName,
131137
Help: blocksMarkedForDeletionHelp,
132-
}, append(commonLabels, ReasonLabelName))
138+
}, append(commonLabels, reasonLabelName))
133139

134140
m.compactions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
135141
Name: "cortex_compactor_group_compactions_total",
@@ -159,6 +165,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
159165
Name: "cortex_compactor_remaining_planned_compactions",
160166
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
161167
}, commonLabels)
168+
m.compactionErrorsCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
169+
Name: "cortex_compactor_compaction_error_total",
170+
Help: "Total number of errors from compactions.",
171+
}, append(commonLabels, compactionErrorTypesLabelName))
162172

163173
return &m
164174
}

pkg/compactor/compactor_metrics_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,17 @@ func TestSyncerMetrics(t *testing.T) {
119119
cortex_compactor_remaining_planned_compactions{user="aaa"} 377740
120120
cortex_compactor_remaining_planned_compactions{user="bbb"} 388850
121121
cortex_compactor_remaining_planned_compactions{user="ccc"} 399960
122+
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
123+
# TYPE cortex_compactor_compaction_error_total counter
124+
cortex_compactor_compaction_error_total{type="halt",user="aaa"} 444400
125+
cortex_compactor_compaction_error_total{type="halt",user="bbb"} 455510
126+
cortex_compactor_compaction_error_total{type="halt",user="ccc"} 466620
127+
cortex_compactor_compaction_error_total{type="retriable",user="aaa"} 411070
128+
cortex_compactor_compaction_error_total{type="retriable",user="bbb"} 422180
129+
cortex_compactor_compaction_error_total{type="retriable",user="ccc"} 433290
130+
cortex_compactor_compaction_error_total{type="unauthorized",user="aaa"} 477730
131+
cortex_compactor_compaction_error_total{type="unauthorized",user="bbb"} 488840
132+
cortex_compactor_compaction_error_total{type="unauthorized",user="ccc"} 499950
122133
`))
123134
require.NoError(t, err)
124135

@@ -163,4 +174,13 @@ func generateTestData(cm *compactorMetrics, base float64) {
163174
cm.remainingPlannedCompactions.WithLabelValues("aaa").Add(34 * base)
164175
cm.remainingPlannedCompactions.WithLabelValues("bbb").Add(35 * base)
165176
cm.remainingPlannedCompactions.WithLabelValues("ccc").Add(36 * base)
177+
cm.compactionErrorsCount.WithLabelValues("aaa", retriableError).Add(37 * base)
178+
cm.compactionErrorsCount.WithLabelValues("bbb", retriableError).Add(38 * base)
179+
cm.compactionErrorsCount.WithLabelValues("ccc", retriableError).Add(39 * base)
180+
cm.compactionErrorsCount.WithLabelValues("aaa", haltError).Add(40 * base)
181+
cm.compactionErrorsCount.WithLabelValues("bbb", haltError).Add(41 * base)
182+
cm.compactionErrorsCount.WithLabelValues("ccc", haltError).Add(42 * base)
183+
cm.compactionErrorsCount.WithLabelValues("aaa", unauthorizedError).Add(43 * base)
184+
cm.compactionErrorsCount.WithLabelValues("bbb", unauthorizedError).Add(44 * base)
185+
cm.compactionErrorsCount.WithLabelValues("ccc", unauthorizedError).Add(45 * base)
166186
}

pkg/compactor/compactor_test.go

+108-2
Original file line numberDiff line numberDiff line change
@@ -1615,8 +1615,8 @@ func (m *tsdbPlannerMock) getNoCompactBlocks() []string {
16151615
return result
16161616
}
16171617

1618-
func mockBlockMetaJSON(id string) string {
1619-
meta := tsdb.BlockMeta{
1618+
func mockBlockMeta(id string) tsdb.BlockMeta {
1619+
return tsdb.BlockMeta{
16201620
Version: 1,
16211621
ULID: ulid.MustParse(id),
16221622
MinTime: 1574776800000,
@@ -1626,6 +1626,10 @@ func mockBlockMetaJSON(id string) string {
16261626
Sources: []ulid.ULID{ulid.MustParse(id)},
16271627
},
16281628
}
1629+
}
1630+
1631+
func mockBlockMetaJSON(id string) string {
1632+
meta := mockBlockMeta(id)
16291633

16301634
content, err := json.Marshal(meta)
16311635
if err != nil {
@@ -1988,3 +1992,105 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t
19881992

19891993
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
19901994
}
1995+
1996+
func TestCompactor_FailedWithRetriableError(t *testing.T) {
1997+
t.Parallel()
1998+
1999+
ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
2000+
content, err := json.Marshal(ss)
2001+
require.NoError(t, err)
2002+
2003+
bucketClient := &bucket.ClientMock{}
2004+
bucketClient.MockIter("__markers__", []string{}, nil)
2005+
bucketClient.MockIter("", []string{"user-1"}, nil)
2006+
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
2007+
bucketClient.MockIter("user-1/markers/", nil, nil)
2008+
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
2009+
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
2010+
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error"))
2011+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
2012+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
2013+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
2014+
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, errors.New("test retriable error"))
2015+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
2016+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
2017+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
2018+
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
2019+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
2020+
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
2021+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
2022+
2023+
cfg := prepareConfig()
2024+
cfg.CompactionRetries = 2
2025+
2026+
c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
2027+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)
2028+
2029+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2030+
2031+
cortex_testutil.Poll(t, 1*time.Second, 2.0, func() interface{} {
2032+
return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", retriableError))
2033+
})
2034+
2035+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
2036+
2037+
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
2038+
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
2039+
# TYPE cortex_compactor_compaction_error_total counter
2040+
cortex_compactor_compaction_error_total{type="retriable", user="user-1"} 2
2041+
`),
2042+
"cortex_compactor_compaction_retry_error_total",
2043+
"cortex_compactor_compaction_error_total",
2044+
))
2045+
}
2046+
2047+
func TestCompactor_FailedWithHaltError(t *testing.T) {
2048+
t.Parallel()
2049+
2050+
ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
2051+
content, err := json.Marshal(ss)
2052+
require.NoError(t, err)
2053+
2054+
bucketClient := &bucket.ClientMock{}
2055+
bucketClient.MockIter("__markers__", []string{}, nil)
2056+
bucketClient.MockIter("", []string{"user-1"}, nil)
2057+
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
2058+
bucketClient.MockIter("user-1/markers/", nil, nil)
2059+
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
2060+
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
2061+
bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{})
2062+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
2063+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
2064+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
2065+
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, compact.HaltError{})
2066+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
2067+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
2068+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
2069+
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
2070+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
2071+
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
2072+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
2073+
2074+
cfg := prepareConfig()
2075+
cfg.CompactionRetries = 2
2076+
2077+
c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil)
2078+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil)
2079+
2080+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2081+
2082+
cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} {
2083+
return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", haltError))
2084+
})
2085+
2086+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
2087+
2088+
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
2089+
# HELP cortex_compactor_compaction_error_total Total number of errors from compactions.
2090+
# TYPE cortex_compactor_compaction_error_total counter
2091+
cortex_compactor_compaction_error_total{type="halt", user="user-1"} 1
2092+
`),
2093+
"cortex_compactor_compaction_retry_error_total",
2094+
"cortex_compactor_compaction_error_total",
2095+
))
2096+
}

0 commit comments

Comments
 (0)