diff --git a/CHANGELOG.md b/CHANGELOG.md index c4204e9667..d815747bce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 * [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 +* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 14cdcc400e..fbc9e51e49 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -806,8 +806,10 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1018,6 +1020,21 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 78a0e9674e..8817bdc511 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -903,8 +903,10 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1115,6 +1117,21 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c217d95c16..72ddeb1b25 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1339,8 +1339,10 @@ bucket_store: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a comma-separated + # list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1549,6 +1551,21 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size [subrange_size: | default = 16000] diff --git a/integration/querier_test.go b/integration/querier_test.go index ae442bcb16..67a4cedcc2 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -97,6 +97,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendRedis, bucketIndexEnabled: true, }, + "blocks sharding disabled, in-memory chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, "blocks default sharding, in-memory chunk cache": { blocksShardingStrategy: "default", indexCacheBackend: tsdb.IndexCacheBackendRedis, @@ -110,6 +116,25 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendInMemory, bucketIndexEnabled: true, }, + "block sharding disabled, multi-level chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block default sharding, multi-level chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block shuffle sharding, multi-level chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, } for testName, testCfg := range tests { @@ -154,9 +179,10 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendRedis) { flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) } - if testCfg.chunkCacheBackend == tsdb.CacheBackendMemcached { + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) { flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) - } else if testCfg.chunkCacheBackend == tsdb.CacheBackendRedis { + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) { flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 83a6700e1a..79a4f96463 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -21,12 +21,16 @@ import ( "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/model" storecache "github.com/thanos-io/thanos/pkg/store/cache" + + "github.com/cortexproject/cortex/pkg/util" ) var ( + supportedChunkCacheBackends = []string{CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis} supportedMetadataCacheBackends = []string{CacheBackendMemcached, CacheBackendRedis} errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend") + errDuplicatedChunkCacheBackend = errors.New("duplicated chunk cache backend") ) const ( @@ -56,23 +60,52 @@ func (cfg *MetadataCacheBackend) Validate() error { } type ChunkCacheBackend struct { - Backend string `yaml:"backend"` - InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` - Memcached MemcachedClientConfig `yaml:"memcached"` - Redis RedisClientConfig `yaml:"redis"` + Backend string `yaml:"backend"` + InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` + Memcached MemcachedClientConfig `yaml:"memcached"` + Redis RedisClientConfig `yaml:"redis"` + MultiLevel MultiLevelChunkCacheConfig `yaml:"multilevel"` } // Validate the config. func (cfg *ChunkCacheBackend) Validate() error { - switch cfg.Backend { - case CacheBackendMemcached: - return cfg.Memcached.Validate() - case CacheBackendRedis: - return cfg.Redis.Validate() - case CacheBackendInMemory, "": - default: - return errUnsupportedChunkCacheBackend + if cfg.Backend == "" { + return nil + } + + splitBackends := strings.Split(cfg.Backend, ",") + configuredBackends := map[string]struct{}{} + + if len(splitBackends) > 1 { + if err := cfg.MultiLevel.Validate(); err != nil { + return err + } } + + for _, backend := range splitBackends { + if !util.StringsContain(supportedChunkCacheBackends, backend) { + return errUnsupportedChunkCacheBackend + } + + if _, ok := configuredBackends[backend]; ok { + return errDuplicatedChunkCacheBackend + } + + switch backend { + case CacheBackendMemcached: + if err := cfg.Memcached.Validate(); err != nil { + return err + } + case CacheBackendRedis: + if err := cfg.Redis.Validate(); err != nil { + return err + } + case CacheBackendInMemory: + } + + configuredBackends[backend] = struct{}{} + } + return nil } @@ -86,16 +119,22 @@ type ChunksCacheConfig struct { } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory)) + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The chunks cache backend type. Single or Multiple cache backend can be provided. "+ + "Supported values in single cache: %s, %s, %s, and '' (disable). "+ + "Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedChunkCacheBackends, ", "))) cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.") f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.") f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks.") f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.") + + // In the multi level chunk cache, backfill TTL follows subrange TTL + cfg.ChunkCacheBackend.MultiLevel.BackFillTTL = cfg.SubrangeTTL } func (cfg *ChunksCacheConfig) Validate() error { @@ -232,34 +271,41 @@ func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, l } func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { - switch cacheBackend.Backend { - case "": + if cacheBackend.Backend == "" { // No caching. return nil, nil - case CacheBackendInMemory: - inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig()) - if err != nil { - return nil, errors.Wrapf(err, "failed to create in-memory chunk cache") - } - return inMemoryCache, nil - case CacheBackendMemcached: - var client cacheutil.MemcachedClient - client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) - if err != nil { - return nil, errors.Wrapf(err, "failed to create memcached client") - } - return cache.NewMemcachedCache(cacheName, logger, client, reg), nil + } - case CacheBackendRedis: - redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) - if err != nil { - return nil, errors.Wrapf(err, "failed to create redis client") + splitBackends := strings.Split(cacheBackend.Backend, ",") + var ( + caches []cache.Cache + ) + + for _, backend := range splitBackends { + switch backend { + case CacheBackendInMemory: + inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig()) + if err != nil { + return nil, errors.Wrapf(err, "failed to create in-memory chunk cache") + } + caches = append(caches, inMemoryCache) + case CacheBackendMemcached: + var client cacheutil.MemcachedClient + client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + caches = append(caches, cache.NewMemcachedCache(cacheName, logger, client, reg)) + case CacheBackendRedis: + redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create redis client") + } + caches = append(caches, cache.NewRedisCache(cacheName, logger, redisCache, reg)) } - return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil - - default: - return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend) } + + return newMultiLevelChunkCache(cacheName, cacheBackend.MultiLevel, reg, caches...), nil } type Matchers struct { diff --git a/pkg/storage/tsdb/caching_bucket_test.go b/pkg/storage/tsdb/caching_bucket_test.go index 78ad1fb9b9..875134452e 100644 --- a/pkg/storage/tsdb/caching_bucket_test.go +++ b/pkg/storage/tsdb/caching_bucket_test.go @@ -49,6 +49,76 @@ func Test_ChunkCacheBackendValidation(t *testing.T) { }, expectedErr: errUnsupportedChunkCacheBackend, }, + "valid multi chunk cache type": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s,%s", CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + Redis: RedisClientConfig{ + Addresses: "localhost:6379", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: nil, + }, + "duplicate multi chunk cache type": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendInMemory), + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: errDuplicatedChunkCacheBackend, + }, + "invalid max async concurrency": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 0, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: errInvalidMaxAsyncConcurrency, + }, + "invalid max async buffer size": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 0, + MaxBackfillItems: 1, + }, + }, + expectedErr: errInvalidMaxAsyncBufferSize, + }, + "invalid max back fill items": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 0, + }, + }, + expectedErr: errInvalidMaxBackfillItems, + }, } for name, tc := range tests { diff --git a/pkg/storage/tsdb/multilevel_chunk_cache.go b/pkg/storage/tsdb/multilevel_chunk_cache.go new file mode 100644 index 0000000000..e1b0f5bc20 --- /dev/null +++ b/pkg/storage/tsdb/multilevel_chunk_cache.go @@ -0,0 +1,149 @@ +package tsdb + +import ( + "context" + "errors" + "flag" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/cacheutil" +) + +type multiLevelChunkCache struct { + name string + caches []cache.Cache + + backfillProcessor *cacheutil.AsyncOperationProcessor + fetchLatency *prometheus.HistogramVec + backFillLatency *prometheus.HistogramVec + storeDroppedItems prometheus.Counter + backfillDroppedItems prometheus.Counter + maxBackfillItems int + backfillTTL time.Duration +} + +type MultiLevelChunkCacheConfig struct { + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + MaxBackfillItems int `yaml:"max_backfill_items"` + + BackFillTTL time.Duration `yaml:"-"` +} + +func (cfg *MultiLevelChunkCacheConfig) Validate() error { + if cfg.MaxAsyncBufferSize <= 0 { + return errInvalidMaxAsyncBufferSize + } + if cfg.MaxAsyncConcurrency <= 0 { + return errInvalidMaxAsyncConcurrency + } + if cfg.MaxBackfillItems <= 0 { + return errInvalidMaxBackfillItems + } + return nil +} + +func (cfg *MultiLevelChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 3, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.") + f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.") + f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.") +} + +func newMultiLevelChunkCache(name string, cfg MultiLevelChunkCacheConfig, reg prometheus.Registerer, c ...cache.Cache) cache.Cache { + if len(c) == 1 { + return c[0] + } + + return &multiLevelChunkCache{ + name: name, + caches: c, + backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency), + fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_chunks_cache_fetch_duration_seconds", + Help: "Histogram to track latency to fetch items from multi level chunk cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, nil), + backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_chunks_cache_backfill_duration_seconds", + Help: "Histogram to track latency to backfill items from multi level chunk cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, nil), + storeDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_chunks_cache_backfill_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ", + }), + backfillDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_chunks_cache_store_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when storing multilevel cache ", + }), + maxBackfillItems: cfg.MaxBackfillItems, + backfillTTL: cfg.BackFillTTL, + } +} + +func (m *multiLevelChunkCache) Store(data map[string][]byte, ttl time.Duration) { + for _, c := range m.caches { + if err := m.backfillProcessor.EnqueueAsync(func() { + c.Store(data, ttl) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.storeDroppedItems.Inc() + } + } +} + +func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[string][]byte { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues()) + defer timer.ObserveDuration() + + hits := map[string][]byte{} + backfillItems := make([]map[string][]byte, len(m.caches)-1) + + for i, c := range m.caches { + if i < len(m.caches)-1 { + backfillItems[i] = map[string][]byte{} + } + if ctx.Err() != nil { + return nil + } + if data := c.Fetch(ctx, keys); len(data) > 0 { + for k, d := range data { + hits[k] = d + } + + if i > 0 && len(hits) > 0 { + backfillItems[i-1] = hits + } + + if len(hits) == len(keys) { + // fetch done + break + } + } + } + + defer func() { + backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues()) + defer backFillTimer.ObserveDuration() + + for i, values := range backfillItems { + if len(values) == 0 { + continue + } + + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].Store(values, m.backfillTTL) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.Inc() + } + } + }() + + return hits +} + +func (m *multiLevelChunkCache) Name() string { + return m.name +} diff --git a/pkg/storage/tsdb/multilevel_chunk_cache_test.go b/pkg/storage/tsdb/multilevel_chunk_cache_test.go new file mode 100644 index 0000000000..c72c1f3a55 --- /dev/null +++ b/pkg/storage/tsdb/multilevel_chunk_cache_test.go @@ -0,0 +1,193 @@ +package tsdb + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_MultiLevelChunkCacheStore(t *testing.T) { + ttl := time.Hour * 24 + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: ttl, + } + + data := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + } + + testCases := map[string]struct { + m1InitData map[string][]byte + m2InitData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + storeData map[string][]byte + }{ + "should stored data to both caches": { + m1InitData: nil, + m2InitData: nil, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + "should stored data to m1 cache": { + m1InitData: nil, + m2InitData: data, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + "should stored data to m2 cache": { + m1InitData: data, + m2InitData: nil, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + m1 := newMockChunkCache("m1", tc.m1InitData) + m2 := newMockChunkCache("m2", tc.m2InitData) + reg := prometheus.NewRegistry() + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2) + c.Store(tc.storeData, ttl) + + mlc := c.(*multiLevelChunkCache) + // Wait until async operation finishes. + mlc.backfillProcessor.Stop() + + require.Equal(t, tc.expectedM1Data, m1.data) + require.Equal(t, tc.expectedM2Data, m2.data) + }) + } +} + +func Test_MultiLevelChunkCacheFetch(t *testing.T) { + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: time.Hour * 24, + } + + testCases := map[string]struct { + m1ExistingData map[string][]byte + m2ExistingData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + expectedFetchedData map[string][]byte + fetchKeys []string + }{ + "fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": { + m1ExistingData: map[string][]byte{ + "key1": []byte("value1"), + }, + m2ExistingData: map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedM1Data: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedM2Data: map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedFetchedData: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + fetchKeys: []string{"key1", "key2", "key3"}, + }, + "should be not fetched data that do not exist in both caches": { + m1ExistingData: map[string][]byte{ + "key1": []byte("value1"), + }, + m2ExistingData: map[string][]byte{ + "key2": []byte("value2"), + }, + expectedM1Data: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }, + expectedM2Data: map[string][]byte{ + "key2": []byte("value2"), + }, + expectedFetchedData: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }, + fetchKeys: []string{"key1", "key2", "key3"}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + m1 := newMockChunkCache("m1", tc.m1ExistingData) + m2 := newMockChunkCache("m2", tc.m2ExistingData) + reg := prometheus.NewRegistry() + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2) + fetchData := c.Fetch(context.Background(), tc.fetchKeys) + + mlc := c.(*multiLevelChunkCache) + // Wait until async operation finishes. + mlc.backfillProcessor.Stop() + + require.Equal(t, tc.expectedM1Data, m1.data) + require.Equal(t, tc.expectedM2Data, m2.data) + require.Equal(t, tc.expectedFetchedData, fetchData) + }) + } +} + +type mockChunkCache struct { + mu sync.Mutex + name string + data map[string][]byte +} + +func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache { + if data == nil { + data = make(map[string][]byte) + } + + return &mockChunkCache{ + name: name, + data: data, + } +} + +func (m *mockChunkCache) Store(data map[string][]byte, _ time.Duration) { + m.data = data +} + +func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]byte { + m.mu.Lock() + defer m.mu.Unlock() + h := map[string][]byte{} + + for _, k := range keys { + if _, ok := m.data[k]; ok { + h[k] = m.data[k] + } + } + + return h +} + +func (m *mockChunkCache) Name() string { + return m.name +}