diff --git a/cache/manager.go b/cache/manager.go index c1b3d52ec1d6..c818d6e53083 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -143,7 +143,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, descHandlers := descHandlersOf(opts...) if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) { if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, cerrdefs.ErrNotFound) { - return nil, NeedsRemoteProviderError([]digest.Digest{desc.Digest}) + return nil, NeedsRemoteProviderError([]DigestDescriptionPair{{Digest: desc.Digest}}) } else if err != nil { return nil, err } @@ -396,7 +396,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if isLazy, err := cr.isLazy(ctx); err != nil { return err } else if isLazy && dhs[blob] == nil { - missing = append(missing, blob) + missing = append(missing, DigestDescriptionPair{Digest: blob, Description: cr.GetDescription()}) } return nil }); err != nil { diff --git a/cache/opts.go b/cache/opts.go index 1f1db6ca6105..df94083357b7 100644 --- a/cache/opts.go +++ b/cache/opts.go @@ -30,10 +30,19 @@ func descHandlersOf(opts ...RefOption) DescHandlers { type DescHandlerKey digest.Digest -type NeedsRemoteProviderError []digest.Digest //nolint:errname +type NeedsRemoteProviderError []DigestDescriptionPair //nolint:errname + +type DigestDescriptionPair struct { + Digest digest.Digest + Description string +} + +func (d DigestDescriptionPair) String() string { + return fmt.Sprintf("%s: %s", d.Digest, d.Description) +} func (m NeedsRemoteProviderError) Error() string { - return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []digest.Digest(m)) + return fmt.Sprintf("missing descriptor handlers for lazy blobs %+v", []DigestDescriptionPair(m)) } type Unlazy session.Group diff --git a/cache/refs.go b/cache/refs.go index 0cf30737abc0..235fa0c66c44 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -49,6 +49,7 @@ type Ref interface { RefMetadata Release(context.Context) error IdentityMapping() *idtools.IdentityMapping + DescHandlers() DescHandlers DescHandler(digest.Digest) *DescHandler } @@ -612,6 +613,13 @@ func (sr *immutableRef) LayerChain() RefList { return l } +func (sr *immutableRef) DescHandlers() DescHandlers { + // clone to prevent mutation of internal state + dhs := make(DescHandlers) + maps.Copy(dhs, sr.descHandlers) + return dhs +} + func (sr *immutableRef) DescHandler(dgst digest.Digest) *DescHandler { return sr.descHandlers[dgst] } @@ -640,6 +648,13 @@ func (sr *mutableRef) traceLogFields() logrus.Fields { return m } +func (sr *mutableRef) DescHandlers() DescHandlers { + // clone to prevent mutation of internal state + dhs := make(DescHandlers) + maps.Copy(dhs, sr.descHandlers) + return dhs +} + func (sr *mutableRef) DescHandler(dgst digest.Digest) *DescHandler { return sr.descHandlers[dgst] } diff --git a/cache/remotecache/azblob/importer.go b/cache/remotecache/azblob/importer.go index 4e2e17bd933d..5bde1a8e109e 100644 --- a/cache/remotecache/azblob/importer.go +++ b/cache/remotecache/azblob/importer.go @@ -134,7 +134,7 @@ func (ci *importer) loadManifest(ctx context.Context, name string) (*v1.CacheCha progress.OneOff(ctx, fmt.Sprintf("found %d layers in cache", len(allLayers)))(nil) cc := v1.NewCacheChains() - if err := v1.ParseConfig(config, allLayers, cc); err != nil { + if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil { return nil, err } diff --git a/cache/remotecache/gha/gha.go b/cache/remotecache/gha/gha.go index 17c61ef3fecd..e55c9b04a656 100644 --- a/cache/remotecache/gha/gha.go +++ b/cache/remotecache/gha/gha.go @@ -356,7 +356,7 @@ func (ci *importer) loadScope(ctx context.Context, scope string) (*v1.CacheChain } cc := v1.NewCacheChains() - if err := v1.ParseConfig(config, allLayers, cc); err != nil { + if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil { return nil, err } return cc, nil diff --git a/cache/remotecache/import.go b/cache/remotecache/import.go index 99b9695f866c..a6cb1fb593c4 100644 --- a/cache/remotecache/import.go +++ b/cache/remotecache/import.go @@ -117,7 +117,7 @@ func (ci *contentCacheImporter) Resolve(ctx context.Context, desc ocispecs.Descr } cc := v1.NewCacheChains() - if err := v1.Parse(dt, allLayers, cc); err != nil { + if err := v1.Parse(ctx, dt, allLayers, cc); err != nil { return nil, err } @@ -238,7 +238,7 @@ func (ci *contentCacheImporter) importInlineCache(ctx context.Context, dt []byte return errors.WithStack(err) } cc := v1.NewCacheChains() - if err := v1.ParseConfig(config, layers, cc); err != nil { + if err := v1.ParseConfig(ctx, config, layers, cc); err != nil { return err } mu.Lock() diff --git a/cache/remotecache/inline/inline.go b/cache/remotecache/inline/inline.go index 039348022d31..a68a7c23f4e8 100644 --- a/cache/remotecache/inline/inline.go +++ b/cache/remotecache/inline/inline.go @@ -76,7 +76,7 @@ func (ce *exporter) ExportForLayers(ctx context.Context, layers []digest.Digest) } cc := v1.NewCacheChains() - if err := v1.ParseConfig(*config, descs2, cc); err != nil { + if err := v1.ParseConfig(ctx, *config, descs2, cc); err != nil { return nil, err } diff --git a/cache/remotecache/s3/s3.go b/cache/remotecache/s3/s3.go index ffa2ae6c3eca..6aa1a2abd75b 100644 --- a/cache/remotecache/s3/s3.go +++ b/cache/remotecache/s3/s3.go @@ -365,7 +365,7 @@ func (i *importer) load(ctx context.Context) (*v1.CacheChains, error) { } cc := v1.NewCacheChains() - if err := v1.ParseConfig(config, allLayers, cc); err != nil { + if err := v1.ParseConfig(ctx, config, allLayers, cc); err != nil { return nil, err } return cc, nil diff --git a/cache/remotecache/v1/chains.go b/cache/remotecache/v1/chains.go index 87744d9ec775..ee8a67cb2b2f 100644 --- a/cache/remotecache/v1/chains.go +++ b/cache/remotecache/v1/chains.go @@ -7,8 +7,10 @@ import ( "time" "github.com/containerd/containerd/content" + cerrdefs "github.com/containerd/errdefs" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -215,7 +217,26 @@ func (c *item) removeLink(src *item) bool { return found } -func (c *item) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { +func (c *item) AddResult(ctx context.Context, _ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { + if result == nil { + return + } + + if len(result.Descriptors) == 0 { + bklog.G(ctx).Warnf("no descriptors for item %s result, skipping", c.dgst) + return + } + + if result.Provider != nil { + for _, desc := range result.Descriptors { + _, err := result.Provider.Info(ctx, desc.Digest) + if err != nil && !cerrdefs.IsNotFound(err) { + bklog.G(ctx).Warnf("failed to get info for item %s descriptor %s, skipping item result: %v", c.dgst, desc.Digest, err) + return + } + } + } + c.resultTime = createdAt c.result = result } @@ -305,7 +326,7 @@ func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{} type nopRecord struct { } -func (c *nopRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { +func (c *nopRecord) AddResult(_ context.Context, _ digest.Digest, _ int, createdAt time.Time, result *solver.Remote) { } func (c *nopRecord) LinkFrom(rec solver.CacheExporterRecord, index int, selector string) { diff --git a/cache/remotecache/v1/chains_test.go b/cache/remotecache/v1/chains_test.go index b9083d9f2f8d..165949b7229d 100644 --- a/cache/remotecache/v1/chains_test.go +++ b/cache/remotecache/v1/chains_test.go @@ -29,7 +29,7 @@ func TestSimpleMarshal(t *testing.T) { Digest: dgst("d1"), }}, } - baz.AddResult("", 0, time.Now(), r0) + baz.AddResult(context.TODO(), "", 0, time.Now(), r0) } addRecords() @@ -76,7 +76,7 @@ func TestSimpleMarshal(t *testing.T) { require.NoError(t, err) newChains := NewCacheChains() - err = Parse(dt, descPairs, newChains) + err = Parse(context.TODO(), dt, descPairs, newChains) require.NoError(t, err) cfg3, _, err := cc.Marshal(context.TODO()) diff --git a/cache/remotecache/v1/parse.go b/cache/remotecache/v1/parse.go index 44b1645b9640..13ff2d6ed6f1 100644 --- a/cache/remotecache/v1/parse.go +++ b/cache/remotecache/v1/parse.go @@ -1,6 +1,7 @@ package cacheimport import ( + "context" "encoding/json" "github.com/moby/buildkit/solver" @@ -9,27 +10,27 @@ import ( "github.com/pkg/errors" ) -func Parse(configJSON []byte, provider DescriptorProvider, t solver.CacheExporterTarget) error { +func Parse(ctx context.Context, configJSON []byte, provider DescriptorProvider, t solver.CacheExporterTarget) error { var config CacheConfig if err := json.Unmarshal(configJSON, &config); err != nil { return errors.WithStack(err) } - return ParseConfig(config, provider, t) + return ParseConfig(ctx, config, provider, t) } -func ParseConfig(config CacheConfig, provider DescriptorProvider, t solver.CacheExporterTarget) error { +func ParseConfig(ctx context.Context, config CacheConfig, provider DescriptorProvider, t solver.CacheExporterTarget) error { cache := map[int]solver.CacheExporterRecord{} for i := range config.Records { - if _, err := parseRecord(config, i, provider, t, cache); err != nil { + if _, err := parseRecord(ctx, config, i, provider, t, cache); err != nil { return err } } return nil } -func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver.CacheExporterTarget, cache map[int]solver.CacheExporterRecord) (solver.CacheExporterRecord, error) { +func parseRecord(ctx context.Context, cc CacheConfig, idx int, provider DescriptorProvider, t solver.CacheExporterTarget, cache map[int]solver.CacheExporterRecord) (solver.CacheExporterRecord, error) { if r, ok := cache[idx]; ok { if r == nil { return nil, errors.Errorf("invalid looping record") @@ -46,7 +47,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. cache[idx] = nil for i, inputs := range rec.Inputs { for _, inp := range inputs { - src, err := parseRecord(cc, inp.LinkIndex, provider, t, cache) + src, err := parseRecord(ctx, cc, inp.LinkIndex, provider, t, cache) if err != nil { return nil, err } @@ -61,7 +62,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. return nil, err } if remote != nil { - r.AddResult("", 0, res.CreatedAt, remote) + r.AddResult(ctx, "", 0, res.CreatedAt, remote) } } @@ -86,7 +87,7 @@ func parseRecord(cc CacheConfig, idx int, provider DescriptorProvider, t solver. } if remote != nil { remote.Provider = mp - r.AddResult("", 0, res.CreatedAt, remote) + r.AddResult(ctx, "", 0, res.CreatedAt, remote) } } diff --git a/cache/remotecache/v1/utils.go b/cache/remotecache/v1/utils.go index ef0294d75f18..812c89b48e2c 100644 --- a/cache/remotecache/v1/utils.go +++ b/cache/remotecache/v1/utils.go @@ -5,7 +5,6 @@ import ( "fmt" "sort" - cerrdefs "github.com/containerd/errdefs" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/bklog" digest "github.com/opencontainers/go-digest" @@ -275,20 +274,6 @@ type marshalState struct { } func marshalRemote(ctx context.Context, r *solver.Remote, state *marshalState) string { - if len(r.Descriptors) == 0 { - return "" - } - - if r.Provider != nil { - for _, d := range r.Descriptors { - if _, err := r.Provider.Info(ctx, d.Digest); err != nil { - if !cerrdefs.IsNotImplemented(err) { - return "" - } - } - } - } - var parentID string if len(r.Descriptors) > 1 { r2 := &solver.Remote{ diff --git a/client/client_test.go b/client/client_test.go index e7a5886f8eec..7abfca7ab808 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -222,6 +222,8 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testLayerLimitOnMounts, testFrontendVerifyPlatforms, testRunValidExitCodes, + testSameChainIDWithLazyBlobsCacheExport, + testSameChainIDWithLazyBlobsCacheMountBase, } func TestIntegration(t *testing.T) { @@ -10895,3 +10897,243 @@ func testRunValidExitCodes(t *testing.T, sb integration.Sandbox) { require.Error(t, err) require.ErrorContains(t, err, "exit code: 0") } + +func testSameChainIDWithLazyBlobsCacheExport(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + workers.CheckFeatureCompat(t, sb, + workers.FeatureCacheExport, + workers.FeatureCacheImport, + workers.FeatureCacheBackendRegistry, + ) + + c, err := New(sb.Context(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + registry, err := sb.NewRegistry() + if errors.Is(err, integration.ErrRequirements) { + t.Skip(err.Error()) + } + require.NoError(t, err) + + // push the base busybox image, ensuring it uses gzip + + def, err := llb.Image("busybox:latest"). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxGzipRef, + "push": "true", + "compression": "gzip", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + // push the base busybox image plus an extra layer, ensuring it uses zstd + // the extra layer allows us to avoid edge-merge/cache-load later + def, err = llb.Image("busybox:latest"). + Run(llb.Shlex(`touch /foo`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxZstdRef, + "push": "true", + "compression": "zstd", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // create non-lazy cache refs for the zstd image + def, err = llb.Image(busyboxZstdRef). + Run(llb.Shlex(`true`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // Create lazy cache refs for the gzip layers that will be deduped by chainID with + // the zstd layers made in the previous solve. + // Put a random file in the rootfs, run a cache invalidation step and then copy + // the random file to a r/w mnt. + def, err = llb.Image(busyboxGzipRef). + Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 | sha256sum > /rand"`)).Root(). + Run(llb.Shlex(`echo `+identity.NewID())).Root(). + Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()). + Marshal(sb.Context()) + require.NoError(t, err) + + outDir := t.TempDir() + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: outDir, + }, + }, + CacheExports: []CacheOptionsEntry{ + { + Type: "registry", + Attrs: map[string]string{ + "ref": registry + "/buildkit/idc:latest", + "mode": "max", + }, + }, + }, + }, nil) + require.NoError(t, err) + + rand1, err := os.ReadFile(filepath.Join(outDir, "rand")) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // Run the same steps as before but with a different cache invalidation step in the middle + // The random file should still be cached from earlier and thus the output should be the same + def, err = llb.Image(busyboxGzipRef). + Run(llb.Shlex(`sh -c "cat /dev/urandom | head -c 100 | sha256sum > /rand"`)).Root(). + Run(llb.Shlex(`echo `+identity.NewID())).Root(). + Run(llb.Shlex(`cp /rand /mnt/rand`)).AddMount("/mnt", llb.Scratch()). + Marshal(sb.Context()) + require.NoError(t, err) + + outDir = t.TempDir() + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: outDir, + }, + }, + CacheImports: []CacheOptionsEntry{ + { + Type: "registry", + Attrs: map[string]string{ + "ref": registry + "/buildkit/idc:latest", + "mode": "max", + }, + }, + }, + }, nil) + require.NoError(t, err) + + rand2, err := os.ReadFile(filepath.Join(outDir, "rand")) + require.NoError(t, err) + + require.Equal(t, string(rand1), string(rand2)) +} + +func testSameChainIDWithLazyBlobsCacheMountBase(t *testing.T, sb integration.Sandbox) { + requiresLinux(t) + workers.CheckFeatureCompat(t, sb, + workers.FeatureCacheExport, + workers.FeatureCacheImport, + workers.FeatureCacheBackendRegistry, + ) + + c, err := New(sb.Context(), sb.Address()) + require.NoError(t, err) + defer c.Close() + + registry, err := sb.NewRegistry() + if errors.Is(err, integration.ErrRequirements) { + t.Skip(err.Error()) + } + require.NoError(t, err) + + // push the base busybox image, ensuring it uses gzip + + def, err := llb.Image("busybox:latest"). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxGzipRef := registry + "/buildkit/busyboxgzip:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxGzipRef, + "push": "true", + "compression": "gzip", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + // push the base busybox image plus an extra layer, ensuring it uses zstd + // the extra layer allows us to avoid edge-merge/cache-load later + def, err = llb.Image("busybox:latest"). + Run(llb.Shlex(`touch /foo`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + busyboxZstdRef := registry + "/buildkit/busyboxzstd:latest" + _, err = c.Solve(sb.Context(), def, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterImage, + Attrs: map[string]string{ + "name": busyboxZstdRef, + "push": "true", + "compression": "zstd", + "force-compression": "true", + }, + }, + }, + }, nil) + require.NoError(t, err) + + ensurePruneAll(t, c, sb) + + // create non-lazy cache refs for the zstd image + def, err = llb.Image(busyboxZstdRef). + Run(llb.Shlex(`true`)).Root(). + Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // use the gzip image as a cache mount base, the cache ref will be deduped by + // chainID with the zstd layers made in the previous solve + def, err = llb.Image(busyboxZstdRef).Run( + llb.Shlex(`touch /mnt/bar`), + llb.AddMount("/mnt", + llb.Image(busyboxGzipRef), + llb.AsPersistentCacheDir("idc", llb.CacheMountShared), + ), + ).Root().Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) + + // try to re-use the cache mount from before, ensure we successfully get + // the same one written to in previous step + def, err = llb.Image(busyboxZstdRef).Run( + llb.Shlex(`stat /mnt/bar`), + llb.AddMount("/mnt", + llb.Image(busyboxGzipRef), + llb.AsPersistentCacheDir("idc", llb.CacheMountShared), + ), + ).Root().Marshal(sb.Context()) + require.NoError(t, err) + _, err = c.Solve(sb.Context(), def, SolveOpt{}, nil) + require.NoError(t, err) +} diff --git a/solver/cacheopts.go b/solver/cacheopts.go index 4b661471ed82..8fd3c17d27c3 100644 --- a/solver/cacheopts.go +++ b/solver/cacheopts.go @@ -28,69 +28,85 @@ func WithCacheOptGetter(ctx context.Context, getter func(includeAncestors bool, return context.WithValue(ctx, cacheOptGetterKey{}, getter) } -func withAncestorCacheOpts(ctx context.Context, start *state) context.Context { +func withAncestorCacheOpts(ctx context.Context, start *sharedOp) context.Context { return WithCacheOptGetter(ctx, func(includeAncestors bool, keys ...interface{}) map[interface{}]interface{} { keySet := make(map[interface{}]struct{}) for _, k := range keys { keySet[k] = struct{}{} } values := make(map[interface{}]interface{}) - walkAncestors(ctx, start, func(st *state) bool { - if st.clientVertex.Error != "" { + walkAncestors(ctx, start, func(op *sharedOp) bool { + if op.st.clientVertex.Error != "" { // don't use values from cancelled or otherwise error'd vertexes return false } - for _, res := range st.op.cacheRes { - if res.Opts == nil { - continue + + for k := range keySet { + var v any + var ok bool + + // check opts set from CacheMap operation + for _, res := range op.cacheRes { + if res.Opts == nil { + continue + } + v, ok = res.Opts[k] + if ok { + break + } } - for k := range keySet { - if v, ok := res.Opts[k]; ok { - values[k] = v - delete(keySet, k) - if len(keySet) == 0 { - return true - } + + // check opts set during cache load + if !ok && op.loadCacheOpts != nil { + v, ok = op.loadCacheOpts[k] + } + + if ok { + values[k] = v + delete(keySet, k) + if len(keySet) == 0 { + return true } } } - return !includeAncestors // stop after the first state unless includeAncestors is true + + return !includeAncestors // stop after the first op unless includeAncestors is true }) return values }) } -func walkAncestors(ctx context.Context, start *state, f func(*state) bool) { - stack := [][]*state{{start}} +func walkAncestors(ctx context.Context, start *sharedOp, f func(*sharedOp) bool) { + stack := [][]*sharedOp{{start}} cache := make(map[digest.Digest]struct{}) for len(stack) > 0 { - sts := stack[len(stack)-1] - if len(sts) == 0 { + ops := stack[len(stack)-1] + if len(ops) == 0 { stack = stack[:len(stack)-1] continue } - st := sts[len(sts)-1] - stack[len(stack)-1] = sts[:len(sts)-1] - if st == nil { + op := ops[len(ops)-1] + stack[len(stack)-1] = ops[:len(ops)-1] + if op == nil { continue } - if _, ok := cache[st.origDigest]; ok { + if _, ok := cache[op.st.origDigest]; ok { continue } - cache[st.origDigest] = struct{}{} - if shouldStop := f(st); shouldStop { + cache[op.st.origDigest] = struct{}{} + if shouldStop := f(op); shouldStop { return } - stack = append(stack, []*state{}) - for _, parentDgst := range st.clientVertex.Inputs { - st.solver.mu.RLock() - parent := st.solver.actives[parentDgst] - st.solver.mu.RUnlock() + stack = append(stack, []*sharedOp{}) + for _, parentDgst := range op.st.clientVertex.Inputs { + op.st.solver.mu.RLock() + parent := op.st.solver.actives[parentDgst] + op.st.solver.mu.RUnlock() if parent == nil { bklog.G(ctx).Warnf("parent %q not found in active job list during cache opt search", parentDgst) continue } - stack[len(stack)-1] = append(stack[len(stack)-1], parent) + stack[len(stack)-1] = append(stack[len(stack)-1], parent.op) } } } diff --git a/solver/edge.go b/solver/edge.go index 27aa54ec0bda..da7de7d7858d 100644 --- a/solver/edge.go +++ b/solver/edge.go @@ -24,7 +24,7 @@ func (t edgeStatusType) String() string { return []string{"initial", "cache-fast", "cache-slow", "complete"}[t] } -func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { +func newEdge(ed Edge, op *sharedOp, index *edgeIndex) *edge { e := &edge{ edge: ed, op: op, @@ -40,7 +40,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { type edge struct { edge Edge - op activeOp + op *sharedOp edgeState depRequests map[pipeReceiver]*dep diff --git a/solver/exporter.go b/solver/exporter.go index 5f040f75d03b..efaaa1a623b2 100644 --- a/solver/exporter.go +++ b/solver/exporter.go @@ -117,6 +117,12 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach return nil, err } + if e.edge != nil { + if op := e.edge.op; op != nil && op.st != nil { + ctx = withAncestorCacheOpts(ctx, op) + } + } + remotes, err := cm.results.LoadRemotes(ctx, res, opt.CompressionOpt, opt.Session) if err != nil { return nil, err @@ -127,7 +133,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach if opt.CompressionOpt != nil { for _, r := range remotes { // record all remaining remotes as well rec := t.Add(recKey) - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r) + rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, r) variants = append(variants, rec) } } @@ -148,7 +154,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach if opt.CompressionOpt != nil { for _, r := range remotes { // record all remaining remotes as well rec := t.Add(recKey) - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, r) + rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, r) variants = append(variants, rec) } } @@ -156,7 +162,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach if remote != nil { for _, rec := range allRec { - rec.AddResult(k.vtx, int(k.output), v.CreatedAt, remote) + rec.AddResult(ctx, k.vtx, int(k.output), v.CreatedAt, remote) } } allRec = append(allRec, variants...) @@ -172,7 +178,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach for _, dep := range deps { recs, err := dep.CacheKey.Exporter.ExportTo(ctx, t, opt) if err != nil { - return nil, nil + return nil, err } for _, r := range recs { srcs[i] = append(srcs[i], expr{r: r, selector: dep.Selector}) @@ -184,7 +190,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach for _, de := range e.edge.secondaryExporters { recs, err := de.cacheKey.CacheKey.Exporter.ExportTo(ctx, t, opt) if err != nil { - return nil, nil + return nil, err } for _, r := range recs { srcs[de.index] = append(srcs[de.index], expr{r: r, selector: de.cacheKey.Selector}) diff --git a/solver/jobs.go b/solver/jobs.go index d8cce3abdcf7..5aa559c6371e 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -825,15 +825,6 @@ type cacheMapResp struct { complete bool } -type activeOp interface { - CacheMap(context.Context, int) (*cacheMapResp, error) - LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) - Exec(ctx context.Context, inputs []Result) (outputs []Result, exporters []ExportableCacheKey, err error) - IgnoreCache() bool - Cache() CacheManager - CalcSlowCache(context.Context, Index, PreprocessFunc, ResultBasedCacheFunc, Result) (digest.Digest, error) -} - func newSharedOp(resolver ResolveOpFunc, st *state) *sharedOp { so := &sharedOp{ resolver: resolver, @@ -869,6 +860,8 @@ type sharedOp struct { cacheDone bool cacheErr error + loadCacheOpts CacheOpts + slowMu sync.Mutex slowCacheRes map[Index]digest.Digest slowCacheErr map[Index]error @@ -879,18 +872,18 @@ func (s *sharedOp) IgnoreCache() bool { } func (s *sharedOp) Cache() CacheManager { - return &cacheWithCacheOpts{s.st.combinedCacheManager(), s.st} + return &cacheWithCacheOpts{s.st.combinedCacheManager(), s} } type cacheWithCacheOpts struct { CacheManager - st *state + op *sharedOp } func (c cacheWithCacheOpts) Records(ctx context.Context, ck *CacheKey) ([]*CacheRecord, error) { // Allow Records accessing to cache opts through ctx. This enable to use remote provider // during checking the cache existence. - return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.st), ck) + return c.CacheManager.Records(withAncestorCacheOpts(ctx, c.op), ck) } func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, error) { @@ -901,9 +894,12 @@ func (s *sharedOp) LoadCache(ctx context.Context, rec *CacheRecord) (Result, err // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "load cache: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) notifyCompleted := notifyStarted(ctx, &s.st.clientVertex, true) - res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s.st), rec) + res, err := s.Cache().Load(withAncestorCacheOpts(ctx, s), rec) tracing.FinishWithError(span, err) notifyCompleted(err, true) + if err == nil { + s.loadCacheOpts = res.CacheOpts() + } return res, err } @@ -952,7 +948,7 @@ func (s *sharedOp) CalcSlowCache(ctx context.Context, index Index, p PreprocessF if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - key, err = f(withAncestorCacheOpts(ctx, s.st), res, s.st) + key, err = f(withAncestorCacheOpts(ctx, s), res, s.st) } if err != nil { select { @@ -1008,7 +1004,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp, if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - ctx = withAncestorCacheOpts(ctx, s.st) + ctx = withAncestorCacheOpts(ctx, s) if len(s.st.vtx.Inputs()) == 0 { // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) @@ -1087,7 +1083,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result, if s.st.mspan.Span != nil { ctx = trace.ContextWithSpan(ctx, s.st.mspan) } - ctx = withAncestorCacheOpts(ctx, s.st) + ctx = withAncestorCacheOpts(ctx, s) // no cache hit. start evaluating the node span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name(), trace.WithAttributes(attribute.String("vertex", s.st.vtx.Digest().String()))) diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index c95fe4966b61..b58073651a03 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -123,7 +123,12 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, } locked := false for _, si := range sis { - if mRef, err := g.cm.GetMutable(ctx, si.ID()); err == nil { + var opts []cache.RefOption + if ref != nil { + opts = append(opts, ref.DescHandlers()) + } + mRef, err := g.cm.GetMutable(ctx, si.ID(), opts...) + if err == nil { bklog.G(ctx).Debugf("reusing ref for cache dir %q: %s", id, mRef.ID()) return mRef, nil } else if errors.Is(err, cache.ErrLocked) { diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index ec4f437e4281..1e27767c48f5 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -398,18 +398,9 @@ func NewProvenanceCreator(ctx context.Context, cp *provenance.Capture, res solve return nil, err } - wref, ok := r.Sys().(*worker.WorkerRef) - if !ok { - return nil, errors.Errorf("invalid worker ref %T", r.Sys()) - } - addLayers = func() error { e := newCacheExporter() - if wref.ImmutableRef != nil { - ctx = withDescHandlerCacheOpts(ctx, wref.ImmutableRef) - } - if _, err := r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{ ResolveRemotes: resolveRemotes, Mode: solver.CacheExportModeRemoteOnly, @@ -512,7 +503,7 @@ type cacheRecord struct { ce *cacheExporter } -func (c *cacheRecord) AddResult(dgst digest.Digest, idx int, createdAt time.Time, result *solver.Remote) { +func (c *cacheRecord) AddResult(_ context.Context, dgst digest.Digest, idx int, createdAt time.Time, result *solver.Remote) { if result == nil || dgst == "" { return } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index 15a1f0911372..e8f3d8c4ff8a 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -691,8 +691,6 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j * err = inBuilderContext(ctx, j, exp.Exporter.Name(), id, func(ctx context.Context, _ session.Group) error { prepareDone := progress.OneOff(ctx, "preparing build cache for export") if err := result.EachRef(cached, inp, func(res solver.CachedResult, ref cache.ImmutableRef) error { - ctx = withDescHandlerCacheOpts(ctx, ref) - // Configure compression compressionConfig := exp.Config().Compression @@ -998,7 +996,6 @@ func inlineCache(ctx context.Context, ie inlineCacheExporter, res solver.CachedR digests = append(digests, desc.Digest) } - ctx = withDescHandlerCacheOpts(ctx, workerRef.ImmutableRef) refCfg := cacheconfig.RefConfig{Compression: compressionopt} if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, ie, solver.CacheExportOpt{ ResolveRemotes: workerRefResolver(refCfg, true, g), // load as many compression blobs as possible @@ -1011,20 +1008,6 @@ func inlineCache(ctx context.Context, ie inlineCacheExporter, res solver.CachedR return ie.ExportForLayers(ctx, digests) } -func withDescHandlerCacheOpts(ctx context.Context, ref cache.ImmutableRef) context.Context { - return solver.WithCacheOptGetter(ctx, func(includeAncestors bool, keys ...interface{}) map[interface{}]interface{} { - vals := make(map[interface{}]interface{}) - for _, k := range keys { - if key, ok := k.(cache.DescHandlerKey); ok { - if handler := ref.DescHandler(digest.Digest(key)); handler != nil { - vals[k] = handler - } - } - } - return vals - }) -} - func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client.SolveStatus) error { if err := s.history.Status(ctx, id, statusChan); err != nil { if !errors.Is(err, os.ErrNotExist) { diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 2d6f4c6357f9..87bfe4e49c2b 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3947,6 +3947,7 @@ func (r *dummyResult) ID() string { return r.id } func (r *dummyResult) Release(context.Context) error { return nil } func (r *dummyResult) Sys() interface{} { return r } func (r *dummyResult) Clone() Result { return r } +func (r *dummyResult) CacheOpts() CacheOpts { return nil } func testOpResolver(v Vertex, b Builder) (Op, error) { if op, ok := v.Sys().(Op); ok { @@ -4077,7 +4078,7 @@ type testExporterRecord struct { linkMap map[digest.Digest]struct{} } -func (r *testExporterRecord) AddResult(_ digest.Digest, _ int, createdAt time.Time, result *Remote) { +func (r *testExporterRecord) AddResult(_ context.Context, _ digest.Digest, _ int, createdAt time.Time, result *Remote) { r.results++ } diff --git a/solver/types.go b/solver/types.go index 56a53d0fd912..800c2c8a6ec4 100644 --- a/solver/types.go +++ b/solver/types.go @@ -64,6 +64,7 @@ type Result interface { Release(context.Context) error Sys() interface{} Clone() Result + CacheOpts() CacheOpts } // CachedResult is a result connected with its cache key @@ -136,7 +137,7 @@ type CacheExporterTarget interface { // CacheExporterRecord is a single object being exported type CacheExporterRecord interface { - AddResult(vtx digest.Digest, index int, createdAt time.Time, result *Remote) + AddResult(ctx context.Context, vtx digest.Digest, index int, createdAt time.Time, result *Remote) LinkFrom(src CacheExporterRecord, index int, selector string) } diff --git a/worker/base/worker.go b/worker/base/worker.go index 3f3a77c8be71..2581b7c5c8f0 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -300,8 +300,8 @@ func (w *Worker) LoadRef(ctx context.Context, id string, hidden bool) (cache.Imm if errors.As(err, &needsRemoteProviders) { if optGetter := solver.CacheOptGetterOf(ctx); optGetter != nil { var keys []interface{} - for _, dgst := range needsRemoteProviders { - keys = append(keys, cache.DescHandlerKey(dgst)) + for _, dgstDescPair := range needsRemoteProviders { + keys = append(keys, cache.DescHandlerKey(dgstDescPair.Digest)) } descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler)) for k, v := range optGetter(true, keys...) { diff --git a/worker/result.go b/worker/result.go index 26054cf8c206..5242ba5df02f 100644 --- a/worker/result.go +++ b/worker/result.go @@ -33,6 +33,16 @@ func (wr *WorkerRef) Release(ctx context.Context) error { return wr.ImmutableRef.Release(ctx) } +func (wr *WorkerRef) CacheOpts() solver.CacheOpts { + opts := solver.CacheOpts{} + if wr.ImmutableRef != nil { + for k, v := range wr.ImmutableRef.DescHandlers() { + opts[cache.DescHandlerKey(k)] = v + } + } + return opts +} + // GetRemotes method abstracts ImmutableRef's GetRemotes to allow a Worker to override. // This is needed for moby integration. // Use this method instead of calling ImmutableRef.GetRemotes() directly.