Skip to content

Commit

Permalink
Misc lcache cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed Aug 1, 2023
1 parent bcb3784 commit 5707085
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 94 deletions.
7 changes: 7 additions & 0 deletions artifacts/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"compress/gzip"
"errors"
"fmt"
"github.com/hephbuild/heph/utils/ads"
"go.uber.org/multierr"
"io"
"os"
Expand Down Expand Up @@ -59,6 +60,12 @@ type Artifact interface {
Compressible() bool
}

func ToSlice[T Artifact](as []T) []Artifact {
return ads.Map(as, func(t T) Artifact {
return t
})
}

type readerMultiCloser struct {
io.Reader
cs []io.Closer
Expand Down
15 changes: 14 additions & 1 deletion engine/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,20 @@ func (e *Engine) scheduleStoreExternalCacheArtifact(ctx context.Context, target
Name: fmt.Sprintf("cache %v %v %v", target.Addr, cache.Name, artifact.Name()),
Deps: deps,
Do: func(w *worker.Worker, ctx context.Context) error {
err := e.RemoteCache.StoreArtifact(ctx, target, cache, artifact)
exists, err := e.LocalCache.Exists(ctx, target, artifact)
if err != nil {
return err
}

if !exists {
if !artifact.GenRequired() {
return nil
}

return fmt.Errorf("%v: %v is supposed to exist but doesn't", target.Addr, artifact.Name())
}

err = e.RemoteCache.StoreArtifact(ctx, target, cache, artifact)
if err != nil {
return fmt.Errorf("store remote cache %v: %v %w", cache.Name, target.Addr, err)
}
Expand Down
11 changes: 4 additions & 7 deletions engine/target_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/hephbuild/heph/exprs"
"github.com/hephbuild/heph/graph"
"github.com/hephbuild/heph/hephprovider"
"github.com/hephbuild/heph/lcache"
"github.com/hephbuild/heph/log/log"
"github.com/hephbuild/heph/platform"
"github.com/hephbuild/heph/sandbox"
Expand Down Expand Up @@ -150,14 +149,12 @@ func (e *Engine) runPrepare(ctx context.Context, target *Target, mode string) (_

restoreSrcRec := &SrcRecorder{}
if target.RestoreCache {
latestDir := e.LocalCache.LatestCacheDir(target)

if xfs.PathExists(latestDir.Abs()) {
if e.LocalCache.LatestCacheDirExists(target) {
done := log.TraceTiming("Restoring cache")

for _, name := range target.OutWithSupport.Names() {
art := target.Artifacts.OutTar(name)
p, err := lcache.UncompressedPathFromArtifact(ctx, target, art, latestDir.Abs())
p, err := e.LocalCache.LatestUncompressedPathFromArtifact(ctx, target, art)
if err != nil {
log.Errorf("restore cache: out %v|%v: %v", target.Addr, art.Name(), err)
continue
Expand Down Expand Up @@ -775,9 +772,9 @@ func (e *Engine) Run(ctx context.Context, rr TargetRunRequest, iocfg sandbox.IOC
return fmt.Errorf("wcs: %w", err)
}

allArtifacts := e.orderedArtifactProducers(target, outRoot.Abs(), logFilePath)
artifactProducers := e.orderedArtifactProducers(target, outRoot.Abs(), logFilePath)

err = e.LocalCache.StoreCache(ctx, target, allArtifacts, len(writeableCaches) > 0)
err = e.LocalCache.StoreCache(ctx, target, artifactProducers, len(writeableCaches) > 0)
if err != nil {
return fmt.Errorf("cache: store: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion lcache/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func UncompressedPathFromArtifact(ctx context.Context, target graph.Targeter, ar
}
defer gr.Close()

grc, cancel := xio.ContextReader(ctx, gr)
grc, cancel := xio.ContextCloser(ctx, gr)
defer cancel()

_, err = io.Copy(tf, grc)
Expand Down
81 changes: 43 additions & 38 deletions lcache/artifacts_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,42 @@ import (
"errors"
"fmt"
"github.com/hephbuild/heph/artifacts"
"github.com/hephbuild/heph/graph"
"github.com/hephbuild/heph/log/log"
"github.com/hephbuild/heph/specs"
"github.com/hephbuild/heph/utils/xfs"
"github.com/hephbuild/heph/utils/xio"
"io"
"os"
"path/filepath"
)

func (e *LocalCacheState) LockArtifact(ctx context.Context, starget specs.Specer, artifact artifacts.Artifact) (func() error, error) {
func (e *LocalCacheState) LockArtifact(ctx context.Context, starget specs.Specer, artifact artifacts.Artifact) (func(), error) {
target := starget.Spec()
l := e.TargetMetas.Find(target).cacheLocks[artifact.Name()]
err := l.Lock(ctx)
if err != nil {
return nil, fmt.Errorf("lock %v %v: %w", target.Addr, artifact.Name(), err)
}

return func() error {
return func() {
err := l.Unlock()
if err != nil {
return fmt.Errorf("unlock %v %v: %w", target.Addr, artifact.Name(), err)
log.Errorf("unlock %v %v: %v", target.Addr, artifact.Name(), err)
}

return nil
}, nil
}

func (e *LocalCacheState) LockArtifacts(ctx context.Context, starget specs.Specer, allArtifacts []ArtifactWithProducer) (func(), error) {
target := starget.Spec()

unlockers := make([]func() error, 0, len(allArtifacts))
func (e *LocalCacheState) LockArtifacts(ctx context.Context, target specs.Specer, artifacts []artifacts.Artifact) (func(), error) {
unlockers := make([]func(), 0, len(artifacts))

unlocker := func() {
for _, unlock := range unlockers {
err := unlock()
if err != nil {
log.Errorf("unlock %v: %v", target.Addr, err)
}
unlock()
}
}

for _, artifact := range allArtifacts {
for _, artifact := range artifacts {
unlock, err := e.LockArtifact(ctx, target, artifact)
if err != nil {
unlocker()
Expand All @@ -58,15 +53,34 @@ func (e *LocalCacheState) LockArtifacts(ctx context.Context, starget specs.Spece
return unlocker, nil
}

func (e *LocalCacheState) GenArtifacts(ctx context.Context, dir string, target specs.Specer, allArtifacts []ArtifactWithProducer, compress bool) error {
unlock, err := e.LockArtifacts(ctx, target, allArtifacts)
func (e *LocalCacheState) GenArtifacts(ctx context.Context, target graph.Targeter, arts []ArtifactWithProducer, compress bool) error {
unlock, err := e.LockArtifacts(ctx, target, artifacts.ToSlice(arts))
if err != nil {
return err
}
defer unlock()

for _, artifact := range allArtifacts {
_, err := GenArtifact(ctx, dir, artifact, compress)
dir := e.cacheDir(target).Abs()

err = os.RemoveAll(dir)
if err != nil {
return err
}

err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}

for _, artifact := range arts {
shouldCompress := artifact.Compressible() && compress

p := filepath.Join(dir, artifact.FileName())
if shouldCompress {
p = filepath.Join(dir, artifact.GzFileName())
}

err := GenArtifact(ctx, p, artifact, shouldCompress)
if err != nil {
return fmt.Errorf("genartifact %v: %w", artifact.Name(), err)
}
Expand All @@ -87,32 +101,23 @@ func (g *ArtifactGenContext) Writer() io.Writer {

var ArtifactSkip = errors.New("skip artifact")

func GenArtifact(ctx context.Context, dir string, a ArtifactWithProducer, compress bool) (string, error) {
shouldCompress := a.Compressible() && compress

p := filepath.Join(dir, a.FileName())
if shouldCompress {
p = filepath.Join(dir, a.GzFileName())
}

func GenArtifact(ctx context.Context, p string, a ArtifactWithProducer, compress bool) error {
tmpp := xfs.ProcessUniquePath(p)
defer os.Remove(tmpp)

f, err := os.Create(tmpp)
if err != nil {
return "", err
return err
}
defer f.Close()

go func() {
<-ctx.Done()
_ = f.Close()
}()
f, done := xio.ContextCloser(ctx, f)
defer done()

gctx := ArtifactGenContext{}
gctx.w = f

if shouldCompress {
if compress {
gw := gzip.NewWriter(f)
defer gw.Close()

Expand All @@ -123,24 +128,24 @@ func GenArtifact(ctx context.Context, dir string, a ArtifactWithProducer, compre
if err != nil {
if errors.Is(err, ArtifactSkip) {
if a.GenRequired() {
return "", fmt.Errorf("%v is required, but returned: %w", a.Name(), err)
return fmt.Errorf("is required, but returned: %w", err)
}
return "", nil
return nil
}
return "", fmt.Errorf("%v: %w", a.Name(), err)
return fmt.Errorf("%v: %w", a.Name(), err)
}

if !gctx.accessedWriter {
return "", fmt.Errorf("%v did not produce output", a.Name())
return fmt.Errorf("did not produce output")
}

_ = gctx.w.Close()
_ = f.Close()

err = os.Rename(tmpp, p)
if err != nil {
return "", err
return err
}

return p, nil
return nil
}
4 changes: 2 additions & 2 deletions lcache/cache_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (e *LocalCacheState) runGc(targets []*graph.Target, targetDirs []string, fl

var latestTarget string
for _, entry := range dirEntries {
if entry.Name() == "latest" {
if entry.Name() == LatestDir {
latestTarget, _ = os.Readlink(filepath.Join(dir, entry.Name()))
}
}

entries := make([]gcEntry, 0)
for _, entry := range dirEntries {
if entry.Name() == "latest" {
if entry.Name() == LatestDir {
continue
}

Expand Down
42 changes: 18 additions & 24 deletions lcache/lcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type LocalCacheState struct {
cacheHashInputPathsModtime *maps.Map[targetCacheKey, map[string]time.Time]
}

const LatestDir = "latest"

func NewState(root *hroot.State, g *graph.State, obs *observability.Observability, finalizers *finalizers.Finalizers) (*LocalCacheState, error) {
cachePath := root.Home.Join("cache")
loc, err := vfssimple.NewLocation("file://" + cachePath.Abs() + "/")
Expand Down Expand Up @@ -113,7 +115,7 @@ func NewState(root *hroot.State, g *graph.State, obs *observability.Observabilit
return s, nil
}

func (e *LocalCacheState) StoreCache(ctx context.Context, ttarget graph.Targeter, allArtifacts []ArtifactWithProducer, compress bool) (rerr error) {
func (e *LocalCacheState) StoreCache(ctx context.Context, ttarget graph.Targeter, artifacts []ArtifactWithProducer, compress bool) (rerr error) {
target := ttarget.GraphTarget()

if target.ConcurrentExecution {
Expand All @@ -135,24 +137,7 @@ func (e *LocalCacheState) StoreCache(ctx context.Context, ttarget graph.Targeter
return err
}

dir := e.cacheDir(target).Abs()

err = os.RemoveAll(dir)
if err != nil {
return err
}

err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return err
}

err = e.GenArtifacts(ctx, dir, target, allArtifacts, compress)
if err != nil {
return err
}

err = xfs.CreateParentDir(dir)
err = e.GenArtifacts(ctx, target, artifacts, compress)
if err != nil {
return err
}
Expand All @@ -161,14 +146,19 @@ func (e *LocalCacheState) StoreCache(ctx context.Context, ttarget graph.Targeter
}

func (e *LocalCacheState) LinkLatestCache(target specs.Specer, hash string) error {
latestDir := e.cacheDirForHash(target, "latest").Abs()
latestDir := e.cacheDirForHash(target, LatestDir).Abs()
fromDir := e.cacheDirForHash(target, hash).Abs()

err := os.RemoveAll(latestDir)
if err != nil {
return err
}

err = xfs.CreateParentDir(latestDir)
if err != nil {
return err
}

err = os.Symlink(fromDir, latestDir)
if err != nil && !errors.Is(err, os.ErrExist) {
return err
Expand Down Expand Up @@ -262,6 +252,14 @@ func (e *LocalCacheState) UncompressedPathFromArtifact(ctx context.Context, targ
return UncompressedPathFromArtifact(ctx, target, artifact, e.cacheDir(target).Abs())
}

func (e *LocalCacheState) LatestCacheDirExists(target specs.Specer) bool {
return xfs.PathExists(e.cacheDirForHash(target, LatestDir).Abs())
}

func (e *LocalCacheState) LatestUncompressedPathFromArtifact(ctx context.Context, target graph.Targeter, artifact artifacts.Artifact) (string, error) {
return UncompressedPathFromArtifact(ctx, target, artifact, e.cacheDirForHash(target, LatestDir).Abs())
}

func (e *LocalCacheState) tarListPath(artifact artifacts.Artifact, target graph.Targeter) string {
return e.cacheDir(target).Join(artifact.Name() + ".list").Abs()
}
Expand Down Expand Up @@ -384,10 +382,6 @@ func (e *LocalCacheState) CleanTarget(target specs.Specer, async bool) error {
return nil
}

func (e *LocalCacheState) LatestCacheDir(target specs.Specer) xfs.Path {
return e.cacheDirForHash(target, "latest")
}

func (e *LocalCacheState) VFSLocation(target graph.Targeter) (vfs.Location, error) {
rel, err := filepath.Rel(e.Path.Abs(), e.cacheDir(target).Abs())
if err != nil {
Expand Down
17 changes: 2 additions & 15 deletions rcache/rcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,13 @@ func (e *RemoteCache) DownloadArtifact(ctx context.Context, target graph.Targete
func (e *RemoteCache) StoreArtifact(ctx context.Context, ttarget graph.Targeter, cache graph.CacheConfig, artifact artifacts.Artifact) (rerr error) {
target := ttarget.GraphTarget()

localRoot, err := e.LocalCache.VFSLocation(target)
if err != nil {
return err
}
status.Emit(ctx, tgt.TargetOutputStatus(target, artifact.DisplayName(), fmt.Sprintf("Uploading to %v...", cache.Name)))

exists, err := e.LocalCache.Exists(ctx, target, artifact)
localRoot, err := e.LocalCache.VFSLocation(target)
if err != nil {
return err
}

if !exists {
if !artifact.GenRequired() {
return nil
}

return fmt.Errorf("%v: %v is supposed to exist but doesn't", target.Addr, artifact.Name())
}

status.Emit(ctx, tgt.TargetOutputStatus(target, artifact.DisplayName(), fmt.Sprintf("Uploading to %v...", cache.Name)))

ctx, span := e.Observability.SpanCacheUpload(ctx, target, cache.Name, artifact)
defer span.EndError(rerr)

Expand Down
Loading

0 comments on commit 5707085

Please sign in to comment.