Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hashicorp/go-retryablehttp v0.7.8
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/invopop/jsonschema v0.13.0
github.com/klauspost/compress v1.18.0
github.com/klauspost/pgzip v1.2.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB1
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-retryablehttp v0.7.8 h1:ylXZWnqa7Lhqpk0L1P1LzDtGcCR0rPVUrx/c8Unxc48=
github.com/hashicorp/go-retryablehttp v0.7.8/go.mod h1:rjiScheydd+CxvumBsIrFKlx3iS0jrZ7LvzFGFmuKbw=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E=
Expand Down
131 changes: 69 additions & 62 deletions pkg/apk/apk/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"os"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/chainguard-dev/clog"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/klauspost/compress/gzip"
"go.lsp.dev/uri"
"go.opentelemetry.io/otel"
Expand All @@ -55,49 +57,35 @@ type Signature struct {
// We just hold the parsed index in memory rather than re-parsing it every time,
// which requires gunzipping, which is (somewhat) expensive.
var globalIndexCache = &indexCache{
modtimes: map[string]time.Time{},
urlToEtag: map[string]string{},
modtimes: map[string]time.Time{},
indexes: func() *lru.Cache[cacheKey, func() (NamedIndex, error)] {
indexCacheSize := 100 // Unscientific default size.
if v := os.Getenv("APKO_INDEX_CACHE_SIZE"); v != "" {
size, err := strconv.Atoi(v)
if err != nil {
panic(fmt.Sprintf("invalid APKO_INDEX_CACHE_SIZE %q: %v", v, err))
}
indexCacheSize = size
}

// This only fails for negative cache sizes, so we can ignore the error.
c, _ := lru.New[cacheKey, func() (NamedIndex, error)](indexCacheSize)
return c
}(),
}

type indexResult struct {
idx NamedIndex
err error
type cacheKey struct {
url string
etag string // Only used for remote indexes.
}

type indexCache struct {
// For remote indexes.
onces sync.Map
urlToEtag map[string]string
etagMu sync.Mutex // guards urlToEtag
indexesMux sync.Mutex // To make up for the lack of atomic GetOrAdd in lru.Cache.
indexes *lru.Cache[cacheKey, func() (NamedIndex, error)]

// For local indexes.
sync.Mutex
modtimes map[string]time.Time

// etag|filename -> indexResult
indexes sync.Map
}

func (i *indexCache) forget(key string) {
i.onces.Delete(key)
i.indexes.Delete(key)
}

func (i *indexCache) store(key string, idx NamedIndex, err error) {
i.indexes.Store(key, indexResult{
idx: idx,
err: err,
})
}

func (i *indexCache) load(key string) (NamedIndex, error) {
v, ok := i.indexes.Load(key)
if !ok {
return nil, fmt.Errorf("indexCache did not see key %q after writing it", key)
}
result := v.(indexResult)

return result.idx, result.err
}

func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map[string][]byte, arch string, opts *indexOpts) (NamedIndex, error) {
Expand Down Expand Up @@ -158,30 +146,43 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map
return fetchAndParse(etag)
}

key := fmt.Sprintf("%s@%s", u, etag)

once, _ := i.onces.LoadOrStore(key, &sync.Once{})
once.(*sync.Once).Do(func() {
// If we've seen this URL before, delete any references to old indexes so we can GC them.
// Lock reads/writes to the map, without blocking the fetchAndParse goroutine.
i.etagMu.Lock()
prev, ok := i.urlToEtag[u]
key := cacheKey{url: u, etag: etag}
entry, ok := i.indexes.Get(key)
if !ok {
i.indexesMux.Lock()
// There's no atomic GetOrAdd in the lru.Cache, so we need to check again under the lock.
entry, ok = i.indexes.Get(key)
if ok {
prevKey := fmt.Sprintf("%s@%s", u, prev)
i.forget(prevKey)
// If there is an entry now, there was a race to the Get above. We can just return now.
i.indexesMux.Unlock()
return entry()
}
i.etagMu.Unlock()

idx, err := fetchAndParse(etag)
i.store(key, idx, err)
// If there is no entry yet, we create the sync.OnceValues instance to do the fetch and
// immediately add it to the cache. Others will see it and attach to it. It functions
// kind of like a cached future.
entry = sync.OnceValues(func() (NamedIndex, error) {
idx, err := fetchAndParse(etag)
if err != nil {
// We don't want to cache errors, so we remove the entry.
i.indexes.Remove(key)
}
return idx, err
})
i.indexes.Add(key, entry)

// Record the current etag for this URL so we can GC it later.
i.etagMu.Lock()
i.urlToEtag[u] = etag
i.etagMu.Unlock()
})
// Unlock after adding the entry to the cache. Note that we've not actually executed
// the function yet, so this will not block over network calls.
i.indexesMux.Unlock()

return i.load(key)
// Remove any stale entries with the same URL but different etag.
for _, key := range i.indexes.Keys() {
if key.url == u && key.etag != etag {
i.indexes.Remove(key)
}
}
}
return entry()
} else {
i.Lock()
defer i.Unlock()
Expand All @@ -192,24 +193,30 @@ func (i *indexCache) get(ctx context.Context, repoName, repoURL string, keys map
return nil, fmt.Errorf("stat: %w", err)
}

key := cacheKey{url: u}
entry, hasCachedValue := i.indexes.Get(key)

mod := stat.ModTime()
before, ok := i.modtimes[u]
if !ok || mod.After(before) {
if !hasCachedValue || !ok || mod.After(before) {
b, err := os.ReadFile(u)
if err != nil {
return nil, fmt.Errorf("reading file: %w", err)
}
// If this is the first time or it has changed since the last time...
idx, err := parseRepositoryIndex(ctx, u, keys, arch, b, opts)
if err != nil {
i.store(u, nil, err)
} else {
i.store(u, NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil)
}

entry = sync.OnceValues(func() (NamedIndex, error) {
idx, err := parseRepositoryIndex(ctx, u, keys, arch, b, opts)
if err != nil {
return nil, err
}
return NewNamedRepositoryWithIndex(repoName, repoRef.WithIndex(idx)), nil
})

i.indexes.Add(key, entry)
i.modtimes[u] = mod
}

return i.load(u)
return entry()
}
}

Expand Down
Loading