Skip to content

Commit

Permalink
Make disk cache error values more consistent
Browse files Browse the repository at this point in the history
By returning a cache.Error we can provide more specific http
and gRPC error codes.
  • Loading branch information
mostynb committed Jun 10, 2021
1 parent 70dc87b commit 8292abf
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 63 deletions.
75 changes: 45 additions & 30 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package disk
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -84,6 +83,20 @@ type nameAndInfo struct {
const sha256HashStrSize = sha256.Size * 2 // Two hex characters per byte.
const emptySha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"

func internalErr(err error) *cache.Error {
return &cache.Error{
Code: http.StatusInternalServerError,
Text: err.Error(),
}
}

func badReqErr(format string, a ...interface{}) *cache.Error {
return &cache.Error{
Code: http.StatusBadRequest,
Text: fmt.Sprintf(format, a...),
}
}

// New returns a new instance of a filesystem-based cache rooted at `dir`,
// with a maximum size of `maxSizeBytes` bytes, maximum logical blob size
// `maxBlobSize` and an optional backend `proxy`.
Expand Down Expand Up @@ -475,18 +488,17 @@ func (c *Cache) loadExistingFiles() error {
// a non-nil error is returned.
func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) {
if size < 0 {
return fmt.Errorf("Invalid (negative) size: %d", size)
return badReqErr("Invalid (negative) size: %d", size)
}

if size > c.maxBlobSize {
return fmt.Errorf("Blob size %d too large, max blob size is %d", size, c.maxBlobSize)
return badReqErr("Blob size %d too large, max blob size is %d", size, c.maxBlobSize)
}

// The hash format is checked properly in the http/grpc code.
// Just perform a simple/fast check here, to catch bad tests.
if len(hash) != sha256HashStrSize {
return fmt.Errorf("Invalid hash size: %d, expected: %d",
len(hash), sha256.Size)
return badReqErr("Invalid hash size: %d, expected: %d", len(hash), sha256.Size)
}

if kind == cache.CAS && size == 0 && hash == emptySha256 {
Expand Down Expand Up @@ -517,7 +529,7 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader)
err := c.lru.Unreserve(size)
if err != nil {
// Set named return value.
rErr = err
rErr = internalErr(err)
log.Printf(rErr.Error())
}
c.mu.Unlock()
Expand All @@ -529,10 +541,7 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader)
ok, err := c.lru.Reserve(size)
if err != nil {
c.mu.Unlock()
return &cache.Error{
Code: http.StatusInternalServerError,
Text: err.Error(),
}
return internalErr(err)
}
if !ok {
c.mu.Unlock()
Expand All @@ -555,14 +564,14 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader)
tf, random, err := tfc.Create(filePath, legacy)
blobFile = tf.Name()
if err != nil {
return err
return internalErr(err)
}
removeTempfile = true

var sizeOnDisk int64
sizeOnDisk, err = c.writeAndCloseFile(r, kind, hash, size, tf)
if err != nil {
return err
return internalErr(err)
}

if c.proxy != nil {
Expand All @@ -576,9 +585,11 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader)
}

unreserve, removeTempfile, err = c.commit(key, legacy, blobFile, size, size, sizeOnDisk, random)
if err != nil {
return internalErr(err)
}

// Might be nil.
return err
return nil
}

func (c *Cache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string, size int64, f *os.File) (int64, error) {
Expand Down Expand Up @@ -760,7 +771,10 @@ func (c *Cache) availableOrTryProxy(kind cache.EntryKind, hash string, size int6
return nil, -1, tryProxy, err
}

var errOnlyCompressedCAS = errors.New("Only CAS blobs are available in compressed form")
var errOnlyCompressedCAS = &cache.Error{
Code: http.StatusBadRequest,
Text: "Only CAS blobs are available in compressed form",
}

// Get returns an io.ReadCloser with the content of the cache item stored
// under `hash` and the number of bytes that can be read from it. If the
Expand All @@ -782,8 +796,7 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
// The hash format is checked properly in the http/grpc code.
// Just perform a simple/fast check here, to catch bad tests.
if len(hash) != sha256HashStrSize {
return nil, -1, fmt.Errorf("Invalid hash size: %d, expected: %d",
len(hash), sha256.Size)
return nil, -1, badReqErr("Invalid hash size: %d, expected: %d", len(hash), sha256.Size)
}

if kind == cache.CAS && size <= 0 && hash == emptySha256 {
Expand All @@ -801,10 +814,10 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
}

if offset < 0 {
return nil, -1, fmt.Errorf("Invalid offset: %d", offset)
return nil, -1, badReqErr("Invalid offset: %d", offset)
}
if size > 0 && offset >= size {
return nil, -1, fmt.Errorf("Invalid offset: %d for size %d", offset, size)
return nil, -1, badReqErr("Invalid offset: %d for size %d", offset, size)
}

var err error
Expand All @@ -831,7 +844,7 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
err := c.lru.Unreserve(size)
if err != nil {
// Set named return value.
rErr = err
rErr = internalErr(err)
log.Printf(rErr.Error())
}
c.mu.Unlock()
Expand All @@ -840,7 +853,7 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,

f, foundSize, tryProxy, err := c.availableOrTryProxy(kind, hash, size, offset, zstd)
if err != nil {
return nil, -1, err
return nil, -1, internalErr(err)
}
if tryProxy && size > 0 {
unreserve = true
Expand All @@ -860,8 +873,11 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
if r != nil {
defer r.Close()
}
if err != nil || r == nil {
return nil, -1, err
if err != nil {
return nil, -1, internalErr(err)
}
if r == nil {
return nil, -1, nil
}

if isSizeMismatch(size, foundSize) || foundSize < 0 {
Expand All @@ -873,7 +889,7 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
blobPathBase := path.Join(c.dir, c.FileLocationBase(kind, legacy, hash, foundSize))
tf, random, err := tfc.Create(blobPathBase, legacy)
if err != nil {
return nil, -1, err
return nil, -1, internalErr(err)
}
removeTempfile = true

Expand All @@ -883,12 +899,12 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
sizeOnDisk, err = io.Copy(tf, r)
tf.Close()
if err != nil {
return nil, -1, err
return nil, -1, internalErr(err)
}

rcf, err := os.Open(blobFile)
if err != nil {
return nil, -1, err
return nil, -1, internalErr(err)
}

uncompressedOnDisk := (kind != cache.CAS) || (c.storageMode == casblob.Identity)
Expand All @@ -910,17 +926,16 @@ func (c *Cache) get(kind cache.EntryKind, hash string, size int64, offset int64,
}
}
if err != nil {
return nil, -1, err
return nil, -1, internalErr(err)
}

unreserve, removeTempfile, err = c.commit(key, legacy, blobFile, size, foundSize, sizeOnDisk, random)
if err != nil {
rc.Close()
rc = nil
foundSize = -1
return nil, -1, internalErr(err)
}

return rc, foundSize, err
return rc, foundSize, nil
}

// Contains returns true if the `hash` key exists in the cache, and
Expand Down
16 changes: 16 additions & 0 deletions server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"net/http"
"regexp"

"google.golang.org/genproto/googleapis/bytestream"
Expand Down Expand Up @@ -170,3 +171,18 @@ func checkGRPCClientCert(ctx context.Context) error {

return nil
}

// Return a grpc code based on err, or fall back to returning
// a default Code.
func gRPCErrCode(err error, dflt codes.Code) codes.Code {
if err == nil {
return codes.OK
}

cerr, ok := err.(*cache.Error)
if ok && cerr.Code == http.StatusBadRequest {
return codes.InvalidArgument
}

return dflt
}
32 changes: 17 additions & 15 deletions server/grpc_ac.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net"
"strings"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest,
if !found {
err := s.cache.Put(cache.CAS, (*digest).Hash, (*digest).SizeBytes,
bytes.NewReader(*slice))
if err != nil {
if err != nil && err != io.EOF {
return err
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash)
Expand Down Expand Up @@ -224,9 +225,10 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context,

err = s.cache.Put(cache.AC, req.ActionDigest.Hash,
int64(len(data)), bytes.NewReader(data))
if err != nil {
if err != nil && err != io.EOF {
s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err)
return nil, status.Error(codes.Internal, err.Error())
code := gRPCErrCode(err, codes.Internal)
return nil, status.Error(code, err.Error())
}

// Also cache any inlined blobs, separately in the CAS.
Expand All @@ -247,10 +249,10 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context,

err = s.cache.Put(cache.CAS, f.Digest.Hash,
f.Digest.SizeBytes, bytes.NewReader(f.Contents))
if err != nil {
s.accessLogger.Printf("%s %s %s", logPrefix,
req.ActionDigest.Hash, err)
return nil, status.Error(codes.Internal, err.Error())
if err != nil && err != io.EOF {
s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err)
code := gRPCErrCode(err, codes.Internal)
return nil, status.Error(code, err.Error())
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", f.Digest.Hash)
}
Expand All @@ -270,10 +272,10 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context,

err = s.cache.Put(cache.CAS, hash, sizeBytes,
bytes.NewReader(req.ActionResult.StdoutRaw))
if err != nil {
s.accessLogger.Printf("%s %s %s", logPrefix,
req.ActionDigest.Hash, err)
return nil, status.Error(codes.Internal, err.Error())
if err != nil && err != io.EOF {
s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err)
code := gRPCErrCode(err, codes.Internal)
return nil, status.Error(code, err.Error())
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", hash)
}
Expand All @@ -292,10 +294,10 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context,

err = s.cache.Put(cache.CAS, hash, sizeBytes,
bytes.NewReader(req.ActionResult.StderrRaw))
if err != nil {
s.accessLogger.Printf("%s %s %s", logPrefix,
req.ActionDigest.Hash, err)
return nil, status.Error(codes.Internal, err.Error())
if err != nil && err != io.EOF {
s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err)
code := gRPCErrCode(err, codes.Internal)
return nil, status.Error(code, err.Error())
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", hash)
}
Expand Down
3 changes: 2 additions & 1 deletion server/grpc_asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -176,7 +177,7 @@ func (s *grpcServer) fetchItem(uri string, expectedHash string) (bool, string, i
}

err = s.cache.Put(cache.CAS, expectedHash, expectedSize, rc)
if err != nil {
if err != nil && err != io.EOF {
s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err)
return false, "", int64(-1)
}
Expand Down
32 changes: 17 additions & 15 deletions server/grpc_bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (s *grpcServer) Read(req *bytestream.ReadRequest,
if err != nil {
msg := fmt.Sprintf("GRPC BYTESTREAM READ FAILED: %s %v", hash, err)
s.accessLogger.Printf(msg)
return status.Error(codes.Unknown, msg)
code := gRPCErrCode(err, codes.Internal)
return status.Error(code, msg)
}
if rc == nil {
msg := fmt.Sprintf("GRPC BYTESTREAM READ BLOB NOT FOUND: %s", hash)
Expand Down Expand Up @@ -510,18 +511,12 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {
return err
}

case err, ok := <-putResult:
case err := <-putResult:
select {
case resourceName = <-resourceNameChan:
default:
}

if !ok {

msg := fmt.Sprintf("GRPC BYTESTREAM WRITE FAILED: %s Cache Put closed unexpectedly", resourceName)
s.accessLogger.Printf(msg)
return status.Error(codes.Internal, msg)
}
if err == io.EOF {
s.accessLogger.Printf("GRPC BYTESTREAM SKIPPED WRITE: %s", resourceName)

Expand Down Expand Up @@ -550,16 +545,23 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {
default:
}

err, ok := <-putResult
if !ok {
msg := fmt.Sprintf("GRPC BYTESTREAM WRITE FAILED: %s cache Put closed unexpectedly", resourceName)
s.accessLogger.Printf(msg)
return status.Error(codes.Internal, msg)
err := <-putResult
if err == io.EOF {
s.accessLogger.Printf("GRPC BYTESTREAM SKIPPED WRITE: %s", resourceName)

err = srv.SendAndClose(&resp)
if err != nil {
msg := fmt.Sprintf("GRPC BYTESTREAM SKIPPED WRITE FAILED: %s %v", resourceName, err)
s.accessLogger.Printf(msg)
return status.Error(codes.Internal, msg)
}
return nil
}
if err != nil {
msg := fmt.Sprintf("GRPC BYTESTREAM WRITE FAILED: %s %v", resourceName, err)
msg := fmt.Sprintf("GRPC BYTESTREAM WRITE FAILED: %s Cache Put failed: %v", resourceName, err)
s.accessLogger.Printf(msg)
return status.Error(codes.Unknown, msg)
code := gRPCErrCode(err, codes.Internal)
return status.Error(code, msg)
}

err = srv.SendAndClose(&resp)
Expand Down
Loading

0 comments on commit 8292abf

Please sign in to comment.