From 9715877002f21945650a97390252cf343acee349 Mon Sep 17 00:00:00 2001 From: John Gresty Date: Wed, 24 Apr 2024 12:53:32 +0100 Subject: [PATCH 1/7] feat: add disk based storage interface We plan to split the api and the scraper into separate processes, as such we cannot share memory between the two. This makes the default "memory" storage unusable so will be removed, instead in its place we can use this new one which uses the local filesystem as a backing. The implementation is based on the s3 storage implementation, albeit a bit simplified. --- vervet-underground/config/config.go | 7 + vervet-underground/config/config_test.go | 28 ++ .../internal/scraper/scraper_test.go | 127 +++----- .../internal/storage/disk/disk.go | 291 ++++++++++++++++++ .../internal/storage/disk/disk_test.go | 132 ++++++++ vervet-underground/server.go | 3 + 6 files changed, 499 insertions(+), 89 deletions(-) create mode 100644 vervet-underground/internal/storage/disk/disk.go create mode 100644 vervet-underground/internal/storage/disk/disk_test.go diff --git a/vervet-underground/config/config.go b/vervet-underground/config/config.go index de004cc3..73dfeeda 100644 --- a/vervet-underground/config/config.go +++ b/vervet-underground/config/config.go @@ -14,6 +14,7 @@ type StorageType string const ( StorageTypeMemory StorageType = "memory" + StorageTypeDisk StorageType = "disk" StorageTypeS3 StorageType = "s3" StorageTypeGCS StorageType = "gcs" ) @@ -70,6 +71,12 @@ type StorageConfig struct { IamRoleEnabled bool S3 S3Config GCS GcsConfig + Disk DiskConfig +} + +// DiskConfig defines configuration options for local disk storage. +type DiskConfig struct { + Path string } // S3Config defines configuration options for AWS S3 storage. diff --git a/vervet-underground/config/config_test.go b/vervet-underground/config/config_test.go index b45e8c05..9f09e107 100644 --- a/vervet-underground/config/config_test.go +++ b/vervet-underground/config/config_test.go @@ -87,6 +87,34 @@ func TestLoad(t *testing.T) { c.Assert(*conf, qt.DeepEquals, expected) }) + c.Run("disk config", func(c *qt.C) { + f := createTestFile(c, []byte(`{ + "host": "0.0.0.0", + "services": [{"url":"localhost","name":"localhost"}], + "storage": { + "type": "disk", + "disk": { + "path": "/tmp/foobar" + } + } + }`)) + + conf, err := config.Load(f.Name()) + c.Assert(err, qt.IsNil) + + expected := config.ServerConfig{ + Host: "0.0.0.0", + Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}}, + Storage: config.StorageConfig{ + Type: config.StorageTypeDisk, + Disk: config.DiskConfig{ + Path: "/tmp/foobar", + }, + }, + } + c.Assert(*conf, qt.DeepEquals, expected) + }) + c.Run("s3 config", func(c *qt.C) { f := createTestFile(c, []byte(`{ "host": "0.0.0.0", diff --git a/vervet-underground/internal/scraper/scraper_test.go b/vervet-underground/internal/scraper/scraper_test.go index 8aab2089..6fdcb53d 100644 --- a/vervet-underground/internal/scraper/scraper_test.go +++ b/vervet-underground/internal/scraper/scraper_test.go @@ -6,20 +6,17 @@ import ( "fmt" "net/http" "net/http/httptest" - "strings" "testing" "time" qt "github.com/frankban/quicktest" "github.com/getkin/kin-openapi/openapi3" "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" "vervet-underground/config" "vervet-underground/internal/scraper" - "vervet-underground/internal/storage/mem" - "vervet-underground/internal/testutil" + "vervet-underground/internal/storage/disk" ) var ( @@ -99,7 +96,16 @@ func TestScraper(t *testing.T) { Name: "animals", URL: animalsService.URL, }}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -155,7 +161,15 @@ func TestScraperWithLegacy(t *testing.T) { }, }, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -180,7 +194,15 @@ func TestEmptyScrape(t *testing.T) { cfg := &config.ServerConfig{ Services: nil, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -198,7 +220,15 @@ func TestScrapeClientError(t *testing.T) { cfg := &config.ServerConfig{ Services: []config.ServiceConfig{{Name: "nope", URL: "http://example.com/nope"}}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }), scraper.HTTPClient(&http.Client{ @@ -221,84 +251,3 @@ type errorTransport struct{} func (*errorTransport) RoundTrip(*http.Request) (*http.Response, error) { return nil, fmt.Errorf("bad wolf") } - -func TestScraperCollation(t *testing.T) { - c := qt.New(t) - - petfoodService, animalsService := setupHttpServers(c) - tests := []struct { - name, version, digest string - }{{ - "petfood", "2021-09-01", "sha256:I20cAQ3VEjDrY7O0B678yq+0pYN2h3sxQy7vmdlo4+w=", - }, { - "animals", "2021-10-16", "sha256:P1FEFvnhtxJSqXr/p6fMNKE+HYwN6iwKccBGHIVZbyg=", - }} - - cfg := &config.ServerConfig{ - Services: []config.ServiceConfig{{ - Name: "petfood", URL: petfoodService.URL, - }, { - Name: "animals", URL: animalsService.URL, - }}, - } - memSt := mem.New() - st, ok := memSt.(*mem.Storage) - c.Assert(ok, qt.IsTrue) - sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) - c.Assert(err, qt.IsNil) - - before, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - // Cancel the scrape context after a timeout so we don't hang the test - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - c.Cleanup(cancel) - - // Run the scrape - err = sc.Run(ctx) - c.Assert(err, qt.IsNil) - - // Version digests now known to storage - for _, test := range tests { - ok, err := st.HasVersion(ctx, test.name, test.version, test.digest) - c.Assert(err, qt.IsNil) - c.Assert(ok, qt.IsTrue) - } - - collated, err := st.GetCollatedVersionSpecs() - c.Assert(err, qt.IsNil) - c.Assert(len(collated), qt.Equals, 4) - - vi := st.VersionIndex() - c.Assert(len(vi.Versions()), qt.Equals, 4) - for _, version := range vi.Versions() { - specData, err := st.Version(ctx, version.String()) - c.Assert(err, qt.IsNil) - l := openapi3.NewLoader() - spec, err := l.LoadFromData(specData) - c.Assert(err, qt.IsNil) - c.Assert(spec, qt.IsNotNil) - c.Assert(len(spec.Paths), qt.Equals, collatedPaths[version.String()]) - } - - // Assert metrics - after, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - c.Assert(testutil.SampleDelta("vu_scraper_run_duration_seconds", map[string]string{}, before, after), - qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_run_error_total", map[string]string{}, before, after), - qt.Equals, uint64(0)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(petfoodService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(animalsService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) -} diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go new file mode 100644 index 00000000..20646aea --- /dev/null +++ b/vervet-underground/internal/storage/disk/disk.go @@ -0,0 +1,291 @@ +// Package disk provides an implementation of the storage used in Vervet +// Underground that uses a local filesystem. It's not intended for production +// use, but as a functionally complete reference implementation that can be +// used to validate the other parts of the VU system. +package disk + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/snyk/vervet/v6" + + "vervet-underground/internal/storage" +) + +type Storage struct { + path string + mu sync.RWMutex + collatedVersions vervet.VersionIndex + newCollator func() (*storage.Collator, error) +} + +// Option defines a Storage constructor option. +type Option func(*Storage) + +type objectMeta struct { + blob []byte + lastMod time.Time +} + +func New(path string, options ...Option) storage.Storage { + s := &Storage{ + path: path, + newCollator: func() (*storage.Collator, error) { return storage.NewCollator() }, + } + for _, option := range options { + option(s) + } + return s +} + +func (s *Storage) Cleanup() error { + if s.path == "" { + return fmt.Errorf("not cleaning up invalid path") + } + return os.RemoveAll(s.path) +} + +// NewCollator configures the Storage instance to use the given constructor +// function for creating collator instances. +func NewCollator(newCollator func() (*storage.Collator, error)) Option { + return func(s *Storage) { + s.newCollator = newCollator + } +} + +// NotifyVersions implements scraper.Storage. +func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error { + for _, version := range versions { + // TODO: Add method to fetch contents here + // TODO: implement notify versions; update sunset when versions are removed + err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime) + if err != nil { + return err + } + } + return nil +} + +// CollateVersions aggregates versions and revisions from all the services, and +// produces unified versions and merged specs for all APIs. +func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string]bool) error { + // create an aggregate to process collated data from storage data + aggregate, err := s.newCollator() + if err != nil { + return err + } + serviceRevisions, err := s.ListObjects(ctx, storage.ServiceVersionsFolder) + if err != nil { + return err + } + + // all specs are stored as: "service-versions/{service_name}/{version}/{digest}.json" + for _, revKey := range serviceRevisions { + service, version, digest, err := parseServiceVersionRevisionKey(revKey) + if err != nil { + return err + } + if _, ok := serviceFilter[service]; !ok { + continue + } + rev, err := s.GetObjectWithMetadata(revKey) + if err != nil { + return err + } + + // Assuming version is valid in path uploads + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + revision := storage.ContentRevision{ + Service: service, + Version: parsedVersion, + Timestamp: rev.lastMod, + Digest: storage.Digest(digest), + Blob: rev.blob, + } + aggregate.Add(service, revision) + } + versions, specs, err := aggregate.Collate() + if err != nil { + return err + } + + s.mu.Lock() + s.collatedVersions = vervet.NewVersionIndex(versions) + s.mu.Unlock() + + return s.putCollatedSpecs(specs) +} + +// HasVersion implements scraper.Storage. +func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) { + key := s.getServiceVersionRevisionKey(name, version, digest) + path := path.Join(s.path, key) + _, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} + +// NotifyVersion implements scraper.Storage. +func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error { + digest := storage.NewDigest(contents) + key := s.getServiceVersionRevisionKey(name, version, string(digest)) + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + currentRevision := storage.ContentRevision{ + Service: name, + Timestamp: scrapeTime, + Digest: digest, + Blob: contents, + Version: parsedVersion, + } + + _, err = s.GetObject(key) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // Since the digest doesn't exist, add the whole key path + return s.PutObject(key, currentRevision.Blob) + } + return err + } + // digest already exists, nothing to do + return nil +} + +// VersionIndex implements scraper.Storage. +func (s *Storage) VersionIndex() vervet.VersionIndex { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.collatedVersions +} + +// Version implements scraper.Storage. +func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return nil, err + } + + blob, err := s.GetCollatedVersionSpec(version) + if err != nil { + s.mu.RLock() + resolved, err := s.collatedVersions.Resolve(parsedVersion) + s.mu.RUnlock() + if err != nil { + return nil, err + } + return s.GetCollatedVersionSpec(resolved.String()) + } + return blob, nil +} + +func (s *Storage) getServiceVersionRevisionKey(name string, version string, digest string) string { + // digest could contain slashes + b64 := base64.StdEncoding.EncodeToString([]byte(digest)) + return path.Join(storage.ServiceVersionsFolder, name, version, b64) + ".json" +} + +func (s *Storage) GetObject(key string) ([]byte, error) { + path := path.Join(s.path, key) + return os.ReadFile(path) +} + +func (s *Storage) PutObject(key string, body []byte) error { + path := path.Join(s.path, key) + err := os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return err + } + return os.WriteFile(path, body, 0600) +} + +// GetCollatedVersionSpec retrieves a single collated vervet.Version +// and returns the JSON blob. +func (s *Storage) GetCollatedVersionSpec(version string) ([]byte, error) { + path := path.Join(storage.CollatedVersionsFolder, version, "spec.json") + return s.GetObject(path) +} + +// ListObjects gets all objects under a given directory. +func (s *Storage) ListObjects(ctx context.Context, key string) ([]string, error) { + path := path.Join(s.path, key) + objects := make([]string, 0) + err := filepath.Walk(path, func(obj string, info os.FileInfo, err error) error { + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if !info.IsDir() { + objects = append(objects, obj) + } + return nil + }) + return objects, err +} + +func parseServiceVersionRevisionKey(key string) (string, string, string, error) { + digestB64 := filepath.Base(key) + digestB64 = strings.TrimSuffix(digestB64, ".json") + digest, err := base64.StdEncoding.DecodeString(digestB64) + if err != nil { + return "", "", "", err + } + rest := filepath.Dir(key) + version := filepath.Base(rest) + rest = filepath.Dir(rest) + service := filepath.Base(rest) + + return service, version, string(digest), nil +} + +func (s *Storage) GetObjectWithMetadata(key string) (*objectMeta, error) { + info, err := os.Stat(key) + if err != nil { + return nil, err + } + lastMod := info.ModTime() + body, err := os.ReadFile(key) + return &objectMeta{ + lastMod: lastMod, + blob: body, + }, err +} + +// putCollatedSpecs stores the given collated OpenAPI document objects. +func (s *Storage) putCollatedSpecs(objects map[vervet.Version]openapi3.T) error { + for key, file := range objects { + jsonBlob, err := file.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal json for collation upload: %w", err) + } + err = s.PutObject(storage.CollatedVersionsFolder+key.String()+"/spec.json", jsonBlob) + if err != nil { + return err + } + } + return nil +} diff --git a/vervet-underground/internal/storage/disk/disk_test.go b/vervet-underground/internal/storage/disk/disk_test.go new file mode 100644 index 00000000..b3ea41b0 --- /dev/null +++ b/vervet-underground/internal/storage/disk/disk_test.go @@ -0,0 +1,132 @@ +package disk + +import ( + "context" + "fmt" + "testing" + "time" + + qt "github.com/frankban/quicktest" + + "vervet-underground/internal/storage" +) + +var t0 = time.Date(2021, time.December, 3, 20, 49, 51, 0, time.UTC) + +func TestNotifyVersions(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + ctx := context.Background() + err := s.NotifyVersions(ctx, "petfood", []string{"2021-09-01", "2021-09-16"}, t0) + c.Assert(err, qt.IsNil) + // TODO: verify side-effects when there are some... +} + +func TestHasVersion(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + ctx := context.Background() + const cricketsDigest = "sha256:mWpHX0/hIZS9mVd8eobfHWm6OkUsKZLiqd6ShRnNzA4=" + const geckosDigest = "sha256:c5JD7m0g4DVhoaX4z8HFcTP8S/yUOEsjgP8ECkuEHqM=" + for _, digest := range []string{cricketsDigest, geckosDigest} { + ok, err := s.HasVersion(ctx, "petfood", "2021-09-16", digest) + c.Assert(err, qt.IsNil) + c.Assert(ok, qt.IsFalse) + } + err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte("crickets"), t0) + c.Assert(err, qt.IsNil) + err = s.NotifyVersion(ctx, "animals", "2021-09-16", []byte("geckos"), t0) + c.Assert(err, qt.IsNil) + + tests := []struct { + service, version, digest string + shouldHave bool + }{ + {"petfood", "2021-09-16", cricketsDigest, true}, + {"animals", "2021-09-16", geckosDigest, true}, + {"petfood", "2021-09-16", geckosDigest, false}, + {"animals", "2021-09-16", cricketsDigest, false}, + {"petfood", "2021-10-16", cricketsDigest, false}, + {"animals", "2021-09-17", geckosDigest, false}, + } + for i, t := range tests { + c.Logf("test#%d: %v", i, t) + ok, err := s.HasVersion(ctx, t.service, t.version, t.digest) + c.Assert(err, qt.IsNil) + c.Assert(ok, qt.Equals, t.shouldHave) + } +} + +const spec = `{"components":{},"info":{"title":"ServiceA API","version":"0.0.0"},` + + `"openapi":"3.0.0","paths":{"/test":{"get":{"operation":"getTest",` + + `"responses":{"204":{"description":"An empty response"}},"summary":"Test endpoint"}}}}` + +const emptySpec = `{"components":{},"info":{"title":"","version":""},"openapi":"","paths":null}` + +func TestCollateVersions(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + + ctx := context.Background() + err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(emptySpec), t0) + c.Assert(err, qt.IsNil) + + serviceFilter := map[string]bool{"petfood": true} + err = s.CollateVersions(ctx, serviceFilter) + c.Assert(err, qt.IsNil) + before, err := s.Version(ctx, "2021-09-16") + c.Assert(err, qt.IsNil) + c.Assert(string(before), qt.Equals, emptySpec) + + content, err := s.Version(ctx, "2021-01-01") + c.Assert(err.Error(), qt.Equals, fmt.Errorf("no matching version").Error()) + c.Assert(content, qt.IsNil) + + err = s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(spec), t0.Add(time.Second)) + c.Assert(err, qt.IsNil) + err = s.CollateVersions(ctx, serviceFilter) + c.Assert(err, qt.IsNil) + + after, err := s.Version(ctx, "2021-09-16") + c.Assert(err, qt.IsNil) + c.Assert(string(after), qt.Equals, spec) +} + +func TestDiskStorageCollateVersion(t *testing.T) { + c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + + storage.AssertCollateVersion(c, s) +} diff --git a/vervet-underground/server.go b/vervet-underground/server.go index 7b61d43d..fcfa2432 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/server.go @@ -17,6 +17,7 @@ import ( "vervet-underground/internal/handler" "vervet-underground/internal/scraper" "vervet-underground/internal/storage" + "vervet-underground/internal/storage/disk" "vervet-underground/internal/storage/gcs" "vervet-underground/internal/storage/mem" "vervet-underground/internal/storage/s3" @@ -177,6 +178,8 @@ func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayCon switch cfg.Storage.Type { case config.StorageTypeMemory: return mem.New(mem.NewCollator(newCollator)), nil + case config.StorageTypeDisk: + return disk.New(cfg.Storage.Disk.Path, disk.NewCollator(newCollator)), nil case config.StorageTypeS3: return s3.New(ctx, &s3.Config{ AwsRegion: cfg.Storage.S3.Region, From 7382207448799890bd48fb0c83797eaa64baceac Mon Sep 17 00:00:00 2001 From: John Gresty Date: Wed, 24 Apr 2024 13:16:34 +0100 Subject: [PATCH 2/7] feat: remove memory storage from vervet underground We plan to split the vervet underground collator and api into separate processes to help with startup time and availability. Because of this we cannot offer a memory backed storage any more as the processes will not share memory. The memory storage was really only used for testing, this switches that to the new disk storage which should operate the same. BREAKING CHANGE: removed memory storage back end --- vervet-underground/config.default.json | 5 +- vervet-underground/config/config.go | 9 +- vervet-underground/config/config_test.go | 14 +- .../internal/storage/mem/mem.go | 223 ------------------ .../internal/storage/mem/mem_test.go | 100 -------- vervet-underground/server.go | 3 - 6 files changed, 15 insertions(+), 339 deletions(-) delete mode 100644 vervet-underground/internal/storage/mem/mem.go delete mode 100644 vervet-underground/internal/storage/mem/mem_test.go diff --git a/vervet-underground/config.default.json b/vervet-underground/config.default.json index 00425a11..1496039e 100644 --- a/vervet-underground/config.default.json +++ b/vervet-underground/config.default.json @@ -8,6 +8,9 @@ ], "host": "0.0.0.0", "storage": { - "type": "memory" + "type": "disk", + "disk": { + "path": "/tmp/specs" + } } } diff --git a/vervet-underground/config/config.go b/vervet-underground/config/config.go index 73dfeeda..c6a32868 100644 --- a/vervet-underground/config/config.go +++ b/vervet-underground/config/config.go @@ -13,10 +13,9 @@ import ( type StorageType string const ( - StorageTypeMemory StorageType = "memory" - StorageTypeDisk StorageType = "disk" - StorageTypeS3 StorageType = "s3" - StorageTypeGCS StorageType = "gcs" + StorageTypeDisk StorageType = "disk" + StorageTypeS3 StorageType = "s3" + StorageTypeGCS StorageType = "gcs" ) // ServerConfig defines the configuration options for the Vervet Underground service. @@ -99,7 +98,7 @@ type GcsConfig struct { // setDefaults sets default values for the ServerConfig. func setDefaults() { viper.SetDefault("host", "localhost") - viper.SetDefault("storage.type", StorageTypeMemory) + viper.SetDefault("storage.type", StorageTypeDisk) } // loadEnv sets up the config store to load values from environment variables, diff --git a/vervet-underground/config/config_test.go b/vervet-underground/config/config_test.go index 9f09e107..77f33991 100644 --- a/vervet-underground/config/config_test.go +++ b/vervet-underground/config/config_test.go @@ -33,7 +33,7 @@ func TestLoad(t *testing.T) { Host: "localhost", Services: nil, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -52,7 +52,7 @@ func TestLoad(t *testing.T) { Host: "0.0.0.0", Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}}, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -81,7 +81,7 @@ func TestLoad(t *testing.T) { }, }, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -195,7 +195,7 @@ func TestLoad(t *testing.T) { secretConfig := createTestFile(c, []byte(`{ "services": [{"url":"http://user:password@localhost","name":"localhost"}], "storage": { - "type": "memory" + "type": "disk" } }`)) @@ -206,7 +206,7 @@ func TestLoad(t *testing.T) { Host: "0.0.0.0", Services: []config.ServiceConfig{{URL: "http://user:password@localhost", Name: "localhost"}}, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -217,7 +217,7 @@ func TestLoad(t *testing.T) { "host": "0.0.0.0", "services": [{"url":"http://user:password@localhost"}], "storage": { - "type": "memory" + "type": "disk" } }`)) _, err := config.Load(cfg.Name()) @@ -229,7 +229,7 @@ func TestLoad(t *testing.T) { "host": "0.0.0.0", "services": [{"url":"http://service-a","name":"service-a"},{"url":"http://service-a","name":"service-a"}], "storage": { - "type": "memory" + "type": "disk" } }`)) _, err := config.Load(cfg.Name()) diff --git a/vervet-underground/internal/storage/mem/mem.go b/vervet-underground/internal/storage/mem/mem.go deleted file mode 100644 index f44ea5f3..00000000 --- a/vervet-underground/internal/storage/mem/mem.go +++ /dev/null @@ -1,223 +0,0 @@ -// Package mem provides an in-memory implementation of the storage used in -// Vervet Underground. It's not intended for production use, but as a -// functionally complete reference implementation that can be used to validate -// the other parts of the VU system. -package mem - -import ( - "context" - "sort" - "sync" - "time" - - "github.com/rs/zerolog/log" - "github.com/snyk/vervet/v6" - - "vervet-underground/internal/storage" -) - -// versionedResourceMap map [service-name] Vervet Version slice array. -type versionedResourceMap map[string]vervet.VersionSlice - -// mappedRevisionSpecs map [Sha digest of contents string] --> spec contents and metadata. -type mappedRevisionSpecs map[storage.Digest]storage.ContentRevision - -// versionMappedRevisionSpecs map[version-name][digest] --> spec contents and metadata. -type versionMappedRevisionSpecs map[string]mappedRevisionSpecs - -// serviceVersionMappedRevisionSpecs map[service-name][version-name][digest] --> spec contents and metadata. -type serviceVersionMappedRevisionSpecs map[string]versionMappedRevisionSpecs - -// Storage provides an in-memory implementation of Vervet Underground storage. -type Storage struct { - mu sync.RWMutex - - serviceVersions versionedResourceMap - serviceVersionMappedRevisionSpecs serviceVersionMappedRevisionSpecs - - collatedVersions vervet.VersionIndex - collatedVersionedSpecs storage.CollatedVersionMappedSpecs - - newCollator func() (*storage.Collator, error) -} - -// New returns a new Storage instance. -func New(options ...Option) storage.Storage { - s := &Storage{ - serviceVersions: versionedResourceMap{}, - serviceVersionMappedRevisionSpecs: serviceVersionMappedRevisionSpecs{}, - - collatedVersionedSpecs: storage.CollatedVersionMappedSpecs{}, - - newCollator: func() (*storage.Collator, error) { return storage.NewCollator() }, - } - for _, option := range options { - option(s) - } - return s -} - -// Option defines a Storage constructor option. -type Option func(*Storage) - -// NewCollator configures the Storage instance to use the given constructor -// function for creating collator instances. -func NewCollator(newCollator func() (*storage.Collator, error)) Option { - return func(s *Storage) { - s.newCollator = newCollator - } -} - -// NotifyVersions implements scraper.Storage. -func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error { - for _, version := range versions { - // TODO: Add method to fetch contents here - // TODO: implement notify versions; update sunset when versions are removed - err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime) - if err != nil { - return err - } - } - return nil -} - -// HasVersion implements scraper.Storage. -func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) { - s.mu.RLock() - defer s.mu.RUnlock() - name = storage.GetSantizedHost(name) - revisions, ok := s.serviceVersionMappedRevisionSpecs[name][version] - - if !ok { - return false, nil - } - _, ok = revisions[storage.Digest(digest)] - return ok, nil -} - -// NotifyVersion implements scraper.Storage. -func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error { - s.mu.Lock() - defer s.mu.Unlock() - name = storage.GetSantizedHost(name) - digest := storage.NewDigest(contents) - - parsedVersion, err := vervet.ParseVersion(version) - if err != nil { - log.Error().Err(err).Msgf("Failed to resolve Vervet version for %s : %s", name, version) - return err - } - - // Check if service and version structures are initialized - if _, ok := s.serviceVersionMappedRevisionSpecs[name]; !ok { - s.serviceVersionMappedRevisionSpecs[name] = versionMappedRevisionSpecs{} - } - - revisions, ok := s.serviceVersionMappedRevisionSpecs[name][version] - if ok { - if _, exist := revisions[digest]; exist { - return nil - } - } else { - s.serviceVersionMappedRevisionSpecs[name][version] = mappedRevisionSpecs{} - revisions = s.serviceVersionMappedRevisionSpecs[name][version] - } - - // If the version is newly initialized, meaning no revisions exist, - // create the new vervet.VersionSlice with that version initialized - // else, append it to the existing service's VersionSlice - if len(revisions) == 0 { - if _, ok = s.serviceVersions[name]; !ok { - s.serviceVersions[name] = vervet.VersionSlice{parsedVersion} - } else { - s.serviceVersions[name] = append(s.serviceVersions[name], parsedVersion) - // sort versions when new ones are introduced to maintain BST functionality - sort.Sort(s.serviceVersions[name]) - } - } - // End of initializations - - // TODO: we may want to abstract out the storage objects instead of using chained maps. - // add the new ContentRevision - s.serviceVersionMappedRevisionSpecs[name][version][digest] = storage.ContentRevision{ - Service: name, - Timestamp: scrapeTime, - Digest: digest, - Blob: contents, - Version: parsedVersion, - } - - return nil -} - -// VersionIndex implements scraper.Storage. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions -} - -// Version implements scraper.Storage. -func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { - parsedVersion, err := vervet.ParseVersion(version) - if err != nil { - return nil, err - } - - s.mu.RLock() - defer s.mu.RUnlock() - - spec, ok := s.collatedVersionedSpecs[parsedVersion] - if !ok { - resolved, err := s.collatedVersions.Resolve(parsedVersion) - if err != nil { - return nil, err - } - resolvedSpec := s.collatedVersionedSpecs[resolved] - return resolvedSpec.MarshalJSON() - } - return spec.MarshalJSON() -} - -// CollateVersions aggregates versions and revisions from all the services, and produces unified versions and merged specs for all APIs. -func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string]bool) error { - // create an aggregate to process collated data from storage data - aggregate, err := s.newCollator() - if err != nil { - return err - } - for serv, versions := range s.serviceVersionMappedRevisionSpecs { - if _, ok := serviceFilter[serv]; !ok { - continue - } - for _, revisions := range versions { - for _, revision := range revisions { - aggregate.Add(serv, revision) - } - } - } - versions, specs, err := aggregate.Collate() - - s.mu.Lock() - defer s.mu.Unlock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.collatedVersionedSpecs = specs - - return err -} - -func (s *Storage) GetCollatedVersionSpecs() (map[string][]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - versionSpecs := map[string][]byte{} - for key, value := range s.collatedVersionedSpecs { - json, err := value.MarshalJSON() - if err != nil { - return nil, err - } - versionSpecs[key.String()] = json - } - return versionSpecs, nil -} diff --git a/vervet-underground/internal/storage/mem/mem_test.go b/vervet-underground/internal/storage/mem/mem_test.go deleted file mode 100644 index 0ee88dec..00000000 --- a/vervet-underground/internal/storage/mem/mem_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package mem - -import ( - "context" - "fmt" - "testing" - "time" - - qt "github.com/frankban/quicktest" - - "vervet-underground/internal/storage" -) - -var t0 = time.Date(2021, time.December, 3, 20, 49, 51, 0, time.UTC) - -func TestNotifyVersions(t *testing.T) { - c := qt.New(t) - s := New() - ctx := context.Background() - err := s.NotifyVersions(ctx, "petfood", []string{"2021-09-01", "2021-09-16"}, t0) - c.Assert(err, qt.IsNil) - // TODO: verify side-effects when there are some... -} - -func TestHasVersion(t *testing.T) { - c := qt.New(t) - s := New() - ctx := context.Background() - const cricketsDigest = "sha256:mWpHX0/hIZS9mVd8eobfHWm6OkUsKZLiqd6ShRnNzA4=" - const geckosDigest = "sha256:c5JD7m0g4DVhoaX4z8HFcTP8S/yUOEsjgP8ECkuEHqM=" - for _, digest := range []string{cricketsDigest, geckosDigest} { - ok, err := s.HasVersion(ctx, "petfood", "2021-09-16", digest) - c.Assert(err, qt.IsNil) - c.Assert(ok, qt.IsFalse) - } - err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte("crickets"), t0) - c.Assert(err, qt.IsNil) - err = s.NotifyVersion(ctx, "animals", "2021-09-16", []byte("geckos"), t0) - c.Assert(err, qt.IsNil) - - tests := []struct { - service, version, digest string - shouldHave bool - }{ - {"petfood", "2021-09-16", cricketsDigest, true}, - {"animals", "2021-09-16", geckosDigest, true}, - {"petfood", "2021-09-16", geckosDigest, false}, - {"animals", "2021-09-16", cricketsDigest, false}, - {"petfood", "2021-10-16", cricketsDigest, false}, - {"animals", "2021-09-17", geckosDigest, false}, - } - for i, t := range tests { - c.Logf("test#%d: %v", i, t) - ok, err := s.HasVersion(ctx, t.service, t.version, t.digest) - c.Assert(err, qt.IsNil) - c.Assert(ok, qt.Equals, t.shouldHave) - } -} - -const spec = `{"components":{},"info":{"title":"ServiceA API","version":"0.0.0"},` + - `"openapi":"3.0.0","paths":{"/test":{"get":{"operation":"getTest",` + - `"responses":{"204":{"description":"An empty response"}},"summary":"Test endpoint"}}}}` - -const emptySpec = `{"components":{},"info":{"title":"","version":""},"openapi":"","paths":null}` - -func TestCollateVersions(t *testing.T) { - c := qt.New(t) - s := New() - - ctx := context.Background() - err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(emptySpec), t0) - c.Assert(err, qt.IsNil) - - serviceFilter := map[string]bool{"petfood": true} - err = s.CollateVersions(ctx, serviceFilter) - c.Assert(err, qt.IsNil) - before, err := s.Version(ctx, "2021-09-16") - c.Assert(err, qt.IsNil) - c.Assert(string(before), qt.Equals, emptySpec) - - content, err := s.Version(ctx, "2021-01-01") - c.Assert(err.Error(), qt.Equals, fmt.Errorf("no matching version").Error()) - c.Assert(content, qt.IsNil) - - err = s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(spec), t0.Add(time.Second)) - c.Assert(err, qt.IsNil) - err = s.CollateVersions(ctx, serviceFilter) - c.Assert(err, qt.IsNil) - - after, err := s.Version(ctx, "2021-09-16") - c.Assert(err, qt.IsNil) - c.Assert(string(after), qt.Equals, spec) -} - -func TestMemStorageCollateVersion(t *testing.T) { - s := New() - c := qt.New(t) - - storage.AssertCollateVersion(c, s) -} diff --git a/vervet-underground/server.go b/vervet-underground/server.go index fcfa2432..0a31e6f8 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/server.go @@ -19,7 +19,6 @@ import ( "vervet-underground/internal/storage" "vervet-underground/internal/storage/disk" "vervet-underground/internal/storage/gcs" - "vervet-underground/internal/storage/mem" "vervet-underground/internal/storage/s3" ) @@ -176,8 +175,6 @@ func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayCon return storage.NewCollator(collatorOpts...) } switch cfg.Storage.Type { - case config.StorageTypeMemory: - return mem.New(mem.NewCollator(newCollator)), nil case config.StorageTypeDisk: return disk.New(cfg.Storage.Disk.Path, disk.NewCollator(newCollator)), nil case config.StorageTypeS3: From d44c671482e65a228ca7f1f335915f5b8d79508a Mon Sep 17 00:00:00 2001 From: John Gresty Date: Wed, 24 Apr 2024 15:55:12 +0100 Subject: [PATCH 3/7] fix: allow overwriting scrape time in disk storage We use scrape time as a tie breaker when ordering specs with the same version and stability. For our current storages we assume the last modified time of the file (or object) is the scrape time. This poses a problem in tests which assume a monotonic clock when the files are written too quickly, eg with a tmpfs file system it is possible to write files in the same nanosecond. This wasn't an issue when we were using an in-memory store as we stored the given scrape time as is in the storage. The test which exercises this (disk_test:TestCollateVersions) explicitly sets the scrape time using monotonic operations. We need to respect that if we want to exercise this disk storage in the same way we were testing the memory storage. Fixes disk_test:TestCollateVersions so it passes consistently instead of randomly failing. --- .../internal/storage/disk/disk.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go index 20646aea..69bf9ec3 100644 --- a/vervet-underground/internal/storage/disk/disk.go +++ b/vervet-underground/internal/storage/disk/disk.go @@ -165,7 +165,7 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string if err != nil { if errors.Is(err, os.ErrNotExist) { // Since the digest doesn't exist, add the whole key path - return s.PutObject(key, currentRevision.Blob) + return s.PutObject(key, currentRevision.Blob, ¤tRevision.Timestamp) } return err } @@ -212,13 +212,22 @@ func (s *Storage) GetObject(key string) ([]byte, error) { return os.ReadFile(path) } -func (s *Storage) PutObject(key string, body []byte) error { +func (s *Storage) PutObject(key string, body []byte, timestamp *time.Time) error { path := path.Join(s.path, key) err := os.MkdirAll(filepath.Dir(path), os.ModePerm) if err != nil { return err } - return os.WriteFile(path, body, 0600) + err = os.WriteFile(path, body, 0600) + if err != nil { + return err + } + if timestamp == nil { + return nil + } + // Preserve specified timestamp, mostly for testing using a monotonic clock + // on fast file systems. + return os.Chtimes(path, *timestamp, *timestamp) } // GetCollatedVersionSpec retrieves a single collated vervet.Version @@ -282,7 +291,7 @@ func (s *Storage) putCollatedSpecs(objects map[vervet.Version]openapi3.T) error if err != nil { return fmt.Errorf("failed to marshal json for collation upload: %w", err) } - err = s.PutObject(storage.CollatedVersionsFolder+key.String()+"/spec.json", jsonBlob) + err = s.PutObject(storage.CollatedVersionsFolder+key.String()+"/spec.json", jsonBlob, nil) if err != nil { return err } From 5a73833e8c100c710a31a2166311c124b8fc2c98 Mon Sep 17 00:00:00 2001 From: John Gresty Date: Wed, 24 Apr 2024 16:16:45 +0100 Subject: [PATCH 4/7] feat: use read only storage interface in api handler This ensures that no api endpoint can write to storage, which gives us more safety as we want to separate out the scraping into a separate process. Only having the api concerned about reading the storage will mean we are able to set a clear boundary between the scraping and the api. --- .../internal/handler/handler.go | 14 +++++------ .../internal/handler/handler_test.go | 17 ++++++------- .../internal/scraper/scraper.go | 9 ------- .../internal/storage/storage.go | 25 ++++++++++++------- vervet-underground/server.go | 2 +- 5 files changed, 31 insertions(+), 36 deletions(-) diff --git a/vervet-underground/internal/handler/handler.go b/vervet-underground/internal/handler/handler.go index 7999fdbd..b3609465 100644 --- a/vervet-underground/internal/handler/handler.go +++ b/vervet-underground/internal/handler/handler.go @@ -19,21 +19,21 @@ import ( "github.com/snyk/vervet/v6/versionware" "vervet-underground/config" - "vervet-underground/internal/scraper" + "vervet-underground/internal/storage" ) // Handler handles Vervet Underground HTTP requests. type Handler struct { cfg *config.ServerConfig - sc *scraper.Scraper + store storage.ReadOnlyStorage router chi.Router } // New returns a new Handler. -func New(cfg *config.ServerConfig, sc *scraper.Scraper, routerOptions ...func(r chi.Router)) *Handler { +func New(cfg *config.ServerConfig, store storage.ReadOnlyStorage, routerOptions ...func(r chi.Router)) *Handler { h := &Handler{ cfg: cfg, - sc: sc, + store: store, router: chi.NewRouter(), } for i := range routerOptions { @@ -69,7 +69,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) openapiVersions(w http.ResponseWriter, r *http.Request) { - versionIndex := h.sc.VersionIndex() + versionIndex := h.store.VersionIndex() content, err := json.Marshal(versionIndex.Versions().Strings()) if err != nil { logError(err) @@ -105,7 +105,7 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { } } - versionIndex := h.sc.VersionIndex() + versionIndex := h.store.VersionIndex() resolvedVersion, err := versionIndex.Resolve(version) if errors.Is(err, vervet.ErrNoMatchingVersion) { http.Error(w, "Version not found", http.StatusNotFound) @@ -118,7 +118,7 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { resolvedVersion.Stability = version.Stability w.Header().Set(versionware.HeaderSnykVersionServed, resolvedVersion.String()) - content, err := h.sc.Version(r.Context(), resolvedVersion.String()) + content, err := h.store.Version(r.Context(), resolvedVersion.String()) if err != nil { logError(err) http.Error(w, "Failure to retrieve version", http.StatusInternalServerError) diff --git a/vervet-underground/internal/handler/handler_test.go b/vervet-underground/internal/handler/handler_test.go index d63e27ff..a7dd0852 100644 --- a/vervet-underground/internal/handler/handler_test.go +++ b/vervet-underground/internal/handler/handler_test.go @@ -12,12 +12,11 @@ import ( "vervet-underground/config" "vervet-underground/internal/handler" - "vervet-underground/internal/scraper" ) func TestHealth(t *testing.T) { c := qt.New(t) - cfg, h := setup(c) + cfg, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/", nil) @@ -33,7 +32,7 @@ func TestHealth(t *testing.T) { func TestOpenapi(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() for _, path := range []string{"/openapi", "/openapi/"} { w := httptest.NewRecorder() @@ -55,7 +54,7 @@ func TestOpenapi(t *testing.T) { func TestMetrics(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() // NOTE: Metrics are counted globally, so in order for this metrics test to @@ -78,7 +77,7 @@ func TestMetrics(t *testing.T) { func TestOpenapiVersion(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/openapi/2022-01-16~beta", nil) @@ -91,7 +90,7 @@ func TestOpenapiVersion(t *testing.T) { func TestOpenapiVersionNotFound(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/openapi/2021-01-16~beta", nil) @@ -102,7 +101,7 @@ func TestOpenapiVersionNotFound(t *testing.T) { c.Assert(contents, qt.DeepEquals, []byte("Version not found\n")) } -func setup(c *qt.C) (*config.ServerConfig, *handler.Handler) { +func setup() (*config.ServerConfig, *handler.Handler) { cfg := &config.ServerConfig{ Services: []config.ServiceConfig{{ Name: "petfood", URL: "http://petfood.svc.cluster.local", @@ -111,9 +110,7 @@ func setup(c *qt.C) (*config.ServerConfig, *handler.Handler) { }}, } st := &mockStorage{} - sc, err := scraper.New(cfg, st) - c.Assert(err, qt.IsNil) - h := handler.New(cfg, sc, handler.UseDefaultMiddleware) + h := handler.New(cfg, st, handler.UseDefaultMiddleware) return cfg, h } diff --git a/vervet-underground/internal/scraper/scraper.go b/vervet-underground/internal/scraper/scraper.go index 845dec1b..2c6fa994 100644 --- a/vervet-underground/internal/scraper/scraper.go +++ b/vervet-underground/internal/scraper/scraper.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" - "github.com/snyk/vervet/v6" "go.uber.org/multierr" "vervet-underground/config" @@ -286,11 +285,3 @@ func isLegacyVersion(version string) bool { // This default version predates vervet's creation date. return version == "2021-01-01" } - -func (s *Scraper) VersionIndex() vervet.VersionIndex { - return s.storage.VersionIndex() -} - -func (s *Scraper) Version(ctx context.Context, version string) ([]byte, error) { - return s.storage.Version(ctx, version) -} diff --git a/vervet-underground/internal/storage/storage.go b/vervet-underground/internal/storage/storage.go index 5704aa02..b3bf4d95 100644 --- a/vervet-underground/internal/storage/storage.go +++ b/vervet-underground/internal/storage/storage.go @@ -11,6 +11,21 @@ import ( "github.com/snyk/vervet/v6" ) +// ReadOnlyStorage defines functionality needed to fetch spec versions. +// Implmentations can assume that the storage has already been populated +// through the Storage interface. +type ReadOnlyStorage interface { + // HasVersion returns whether the storage has already stored the service + // API spec version at the given content digest. + HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) + + // VersionIndex fetches the Storage Versions index compiled by VU + VersionIndex() vervet.VersionIndex + + // Version fetches the Storage Version spec compiled by VU + Version(ctx context.Context, version string) ([]byte, error) +} + // Storage defines the storage functionality needed in order to store service // API version spec snapshots. type Storage interface { @@ -24,21 +39,13 @@ type Storage interface { // respective versions gathered. CollateVersions(ctx context.Context, serviceFilter map[string]bool) error - // HasVersion returns whether the storage has already stored the service - // API spec version at the given content digest. - HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) - // NotifyVersion tells the storage to store the given version contents at // the scrapeTime. The storage implementation must detect and ignore // duplicate version contents, as some services may not provide content // digest headers in their responses. NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error - // VersionIndex fetches the Storage Versions index compiled by VU - VersionIndex() vervet.VersionIndex - - // Version fetches the Storage Version spec compiled by VU - Version(ctx context.Context, version string) ([]byte, error) + ReadOnlyStorage } // CollatedVersionMappedSpecs Compiled aggregated spec for all services at that given version. diff --git a/vervet-underground/server.go b/vervet-underground/server.go index 0a31e6f8..d025d459 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/server.go @@ -80,7 +80,7 @@ func main() { log.Fatal().Err(err).Msg("failed initialization scraping of service") } - h := handler.New(cfg, sc, handler.UseDefaultMiddleware) + h := handler.New(cfg, st, handler.UseDefaultMiddleware) srv := &http.Server{ Addr: fmt.Sprintf("%s:8080", cfg.Host), From 54e8ba1c2a950731340014cab0b530eea355b3aa Mon Sep 17 00:00:00 2001 From: John Gresty Date: Wed, 24 Apr 2024 21:33:17 +0100 Subject: [PATCH 5/7] feat: split scraper and api in vervet underground Previously we ran the scraper and api in the same process, with the scraper running before the api started listening. This caused us to experience a very long startup time before the api became available, which is not a fun experience when trying to push out an update. In addition if a scrape failed for any reason, such as an external service not being available, then it would take down this api - which knocked our api docs reference site offline. Furthermore this will allow us to scale our api without having to allocate the large amounts of resources that the scraping process needs. Note that this now does not run the scraping unless explicitly started by running the separate scraper binary. They are packaged together for convenience. --- vervet-underground/Dockerfile | 5 +- vervet-underground/Makefile | 16 +- vervet-underground/cmd/api/main.go | 152 ++++++++++++++++++ .../{server.go => cmd/scraper/main.go} | 98 +---------- 4 files changed, 173 insertions(+), 98 deletions(-) create mode 100644 vervet-underground/cmd/api/main.go rename vervet-underground/{server.go => cmd/scraper/main.go} (56%) diff --git a/vervet-underground/Dockerfile b/vervet-underground/Dockerfile index 786ab974..74f75312 100644 --- a/vervet-underground/Dockerfile +++ b/vervet-underground/Dockerfile @@ -18,7 +18,10 @@ RUN go mod download COPY . . RUN --mount=type=cache,target=/root/.cache/go-build \ CGO_ENABLED=0 \ - go build -v -o /go/bin/app ./ + go build -v -o /go/bin/app ./cmd/api/main.go +RUN --mount=type=cache,target=/root/.cache/go-build \ + CGO_ENABLED=0 \ + go build -v -o /go/bin/scraper ./cmd/scraper/main.go ################# # Runtime stage # diff --git a/vervet-underground/Makefile b/vervet-underground/Makefile index 0fccfecd..0f5975db 100644 --- a/vervet-underground/Makefile +++ b/vervet-underground/Makefile @@ -11,7 +11,7 @@ fmt: .PHONY: clean clean: - rm -f server && go clean + rm -f api scraper && go clean .PHONY: lint lint: @@ -36,8 +36,13 @@ test: go test ./... -count=1 -race .PHONY: build -build: - go build server.go +build: api scraper + +api: + go build -o api cmd/api/main.go + +scraper: + go build -o scraper cmd/scraper/main.go #---------------------------------------------------------------------------------- # Check for updates to packages in remote OSS repositories and update go.mod AND # go.sum to match changes. Then download the all the dependencies @@ -57,10 +62,11 @@ test-coverage: .PHONY: start start: - go run server.go + go run cmd/scraper/main.go + go run cmd/api/main.go .PHONY: install-tools -install-tools: +install-tools: ifndef CI mkdir -p ${GO_BIN} curl -sSfL 'https://raw.githubusercontent.com/golangci/golangci-lint/${GOCI_LINT_V}/install.sh' | sh -s -- -b ${GO_BIN} ${GOCI_LINT_V} diff --git a/vervet-underground/cmd/api/main.go b/vervet-underground/cmd/api/main.go new file mode 100644 index 00000000..509e4690 --- /dev/null +++ b/vervet-underground/cmd/api/main.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" + + "vervet-underground/config" + "vervet-underground/internal/handler" + "vervet-underground/internal/storage" + "vervet-underground/internal/storage/disk" + "vervet-underground/internal/storage/gcs" + "vervet-underground/internal/storage/s3" +) + +func main() { + var wait time.Duration + var configJson string + flag.DurationVar(&wait, "graceful-timeout", time.Second*15, + "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m") + flag.StringVar(&configJson, "config-file", "config.default.json", + "the configuration file holding target services and the host address to run server on") + + flag.Parse() + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + zerolog.SetGlobalLevel(zerolog.DebugLevel) + + var cfg *config.ServerConfig + var err error + if cfg, err = config.Load(configJson); err != nil { + log.Fatal().Err(err).Msg("unable to load config") + } + + ctx := context.Background() + st, err := initializeStorage(ctx, cfg) + if err != nil { + log.Fatal().Err(err).Msg("unable to initialize storage client") + } + + h := handler.New(cfg, st, handler.UseDefaultMiddleware) + + srv := &http.Server{ + Addr: fmt.Sprintf("%s:8080", cfg.Host), + // Good practice to set timeouts to avoid Slowloris attacks. + WriteTimeout: time.Second * 15, + ReadTimeout: time.Second * 15, + IdleTimeout: time.Second * 60, + Handler: h, + } + + grp, grpCtx := errgroup.WithContext(ctx) + grp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host)) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("failed to start server: %w", err) + } + return nil + }) + + grp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + interceptSignals(grpCtx) + time.Sleep(wait) + log.Info().Float64("timeoutSeconds", wait.Seconds()).Msg("server stopping") + + return shutdown(ctx, srv, wait) + }) + + if err := grp.Wait(); err != nil { + log.Fatal().Err(err).Msg("server unexpectedly stopped") + } + + log.Info().Msg("shutting down cleanly") +} + +func initializeStorage(ctx context.Context, cfg *config.ServerConfig) (storage.ReadOnlyStorage, error) { + switch cfg.Storage.Type { + case config.StorageTypeDisk: + return disk.New(cfg.Storage.Disk.Path), nil + case config.StorageTypeS3: + return s3.New(ctx, &s3.Config{ + AwsRegion: cfg.Storage.S3.Region, + AwsEndpoint: cfg.Storage.S3.Endpoint, + IamRoleEnabled: cfg.Storage.IamRoleEnabled, + BucketName: cfg.Storage.BucketName, + Credentials: s3.StaticKeyCredentials{ + AccessKey: cfg.Storage.S3.AccessKey, + SecretKey: cfg.Storage.S3.SecretKey, + SessionKey: cfg.Storage.S3.SessionKey, + }, + }) + case config.StorageTypeGCS: + return gcs.New(ctx, &gcs.Config{ + GcsRegion: cfg.Storage.GCS.Region, + GcsEndpoint: cfg.Storage.GCS.Endpoint, + IamRoleEnabled: cfg.Storage.IamRoleEnabled, + BucketName: cfg.Storage.BucketName, + Credentials: gcs.StaticKeyCredentials{ + ProjectId: cfg.Storage.GCS.ProjectId, + Filename: cfg.Storage.GCS.Filename, + }, + }) + } + return nil, fmt.Errorf("unknown storage backend: %s", cfg.Storage.Type) +} + +func interceptSignals(ctx context.Context) { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + select { + case <-ctx.Done(): + case sig := <-sigc: + log.Info().Str("signal", sig.String()).Msg("intercepted signal") + } +} + +func shutdown(ctx context.Context, srv *http.Server, timeout time.Duration) error { + shutdownCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to gracefully shutdown server: %w", err) + } + + return nil +} diff --git a/vervet-underground/server.go b/vervet-underground/cmd/scraper/main.go similarity index 56% rename from vervet-underground/server.go rename to vervet-underground/cmd/scraper/main.go index d025d459..3e79ec01 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/cmd/scraper/main.go @@ -6,15 +6,12 @@ import ( "fmt" "net/http" "os" - "os/signal" "time" - "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "vervet-underground/config" - "vervet-underground/internal/handler" "vervet-underground/internal/scraper" "vervet-underground/internal/storage" "vervet-underground/internal/storage/disk" @@ -24,13 +21,8 @@ import ( func main() { var wait time.Duration - var scrapeInterval time.Duration var configJson string var overlayFile string - flag.DurationVar(&wait, "graceful-timeout", time.Second*15, - "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m") - flag.DurationVar(&scrapeInterval, "scrape-interval", time.Minute, - "the frequency at which scraping occurs - e.g. 15s, 1m, 1h") flag.StringVar(&configJson, "config-file", "config.default.json", "the configuration file holding target services and the host address to run server on") flag.StringVar(&overlayFile, "overlay-file", "", @@ -43,8 +35,7 @@ func main() { var cfg *config.ServerConfig var err error if cfg, err = config.Load(configJson); err != nil { - logError(err) - log.Fatal().Msg("unable to load config") + log.Fatal().Err(err).Msg("unable to load config") } log.Info().Msgf("services: %s", cfg.Services) @@ -52,18 +43,14 @@ func main() { if overlayFile != "" { overlayContents, err = os.ReadFile(overlayFile) if err != nil { - logError(err) - log.Fatal().Msgf("unable to load overlay file %q", overlayFile) + log.Fatal().Err(err).Msgf("unable to load overlay file %q", overlayFile) } } - // initialize Scraper - ticker := time.NewTicker(scrapeInterval) ctx := context.Background() st, err := initializeStorage(ctx, cfg, overlayContents) if err != nil { - logError(err) - log.Fatal().Msg("unable to initialize storage client") + log.Fatal().Err(err).Msg("unable to initialize storage client") } sc, err := scraper.New(cfg, st, scraper.HTTPClient(&http.Client{ @@ -71,77 +58,13 @@ func main() { Transport: scraper.DurationTransport(http.DefaultTransport), })) if err != nil { - logError(err) - log.Fatal().Msg("unable to load storage") - } - // initialize - err = runScrape(ctx, sc) - if err != nil { - log.Fatal().Err(err).Msg("failed initialization scraping of service") + log.Fatal().Err(err).Msg("unable to load storage") } - h := handler.New(cfg, st, handler.UseDefaultMiddleware) - - srv := &http.Server{ - Addr: fmt.Sprintf("%s:8080", cfg.Host), - // Good practice to set timeouts to avoid Slowloris attacks. - WriteTimeout: time.Second * 15, - ReadTimeout: time.Second * 15, - IdleTimeout: time.Second * 60, - Handler: h, - } - - quit := make(chan struct{}, 1) - go func() { - for { - select { - case <-ticker.C: - if err := runScrape(ctx, sc); err != nil { - logError(err) - } - case <-quit: - ticker.Stop() - return - } - } - }() - - // Run our server in a goroutine so that it doesn't block. - go func() { - log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host)) - if err := srv.ListenAndServe(); err != nil && errors.Is(err, http.ErrServerClosed) { - logError(err) - os.Exit(1) - } - }() - - c := make(chan os.Signal, 1) - // We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C) - // SIGKILL, SIGQUIT or SIGTERM (Ctrl+/) will not be caught. - signal.Notify(c, os.Interrupt) - // Block until we receive our signal. - <-c - - // closes the scraper go routine - quit <- struct{}{} - close(quit) - log.Info().Msg("scraper successfully spun down") - - // Create a deadline to wait for. - ctx, cancel := context.WithTimeout(context.Background(), wait) - defer cancel() - // Doesn't block if no connections, but will otherwise wait - // until the timeout deadline. - err = srv.Shutdown(ctx) + err = runScrape(ctx, sc) if err != nil { - logError(err) + log.Fatal().Err(err).Msg("failed scraping of service") } - - // Optionally, you could run srv.Shutdown in a goroutine and block on - // <-ctx.Done() if your application should wait for other services - // to finalize based on context cancellation. - log.Info().Msg("shutting down cleanly") - os.Exit(0) } // runScrape runs scraping all services and can take @@ -157,15 +80,6 @@ func runScrape(ctx context.Context, sc *scraper.Scraper) error { return nil } -func logError(err error) { - log. - Error(). - Stack(). - Err(err). - Str("cause", fmt.Sprintf("%+v", errors.Cause(err))). - Msg("UnhandledException") -} - func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayContents []byte) (storage.Storage, error) { collatorOpts := []storage.CollatorOption{storage.CollatorExcludePattern(cfg.Merging.ExcludePatterns)} if overlayContents != nil { From 8b7e9510d6ded5aeaf913a812816864812113356 Mon Sep 17 00:00:00 2001 From: John Gresty Date: Thu, 25 Apr 2024 00:05:53 +0100 Subject: [PATCH 6/7] fix: always fetch specs from cold storage The implementations of all the storages assume that CollateVersions would be called before trying to access them. If it wasn't then they would not return any specs as the VersionIndex and Version methods were using an in-memory cache and not hitting the backing store. This does not work now we aren't collating versions in the same process. We need to make sure we actually get the versions from storage when they are requested. This will probably be slower, but we can add a cache later if needed. --- .../internal/handler/handler.go | 17 ++++++-- .../internal/handler/handler_test.go | 4 +- .../internal/scraper/gcs_scraper_test.go | 6 ++- .../internal/scraper/s3_scraper_test.go | 6 ++- .../internal/scraper/scraper_test.go | 3 +- .../internal/storage/disk/disk.go | 42 +++++++++++-------- .../internal/storage/gcs/client.go | 36 ++++++++-------- .../internal/storage/s3/client.go | 36 ++++++++-------- .../internal/storage/storage.go | 2 +- 9 files changed, 90 insertions(+), 62 deletions(-) diff --git a/vervet-underground/internal/handler/handler.go b/vervet-underground/internal/handler/handler.go index b3609465..0e03d311 100644 --- a/vervet-underground/internal/handler/handler.go +++ b/vervet-underground/internal/handler/handler.go @@ -69,7 +69,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) openapiVersions(w http.ResponseWriter, r *http.Request) { - versionIndex := h.store.VersionIndex() + versionIndex, err := h.store.VersionIndex(r.Context()) + if err != nil { + logError(err) + http.Error(w, "Cannot get versions", http.StatusInternalServerError) + return + } content, err := json.Marshal(versionIndex.Versions().Strings()) if err != nil { logError(err) @@ -105,7 +110,13 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { } } - versionIndex := h.store.VersionIndex() + ctx := r.Context() + versionIndex, err := h.store.VersionIndex(ctx) + if err != nil { + logError(err) + http.Error(w, "Cannot get versions", http.StatusInternalServerError) + return + } resolvedVersion, err := versionIndex.Resolve(version) if errors.Is(err, vervet.ErrNoMatchingVersion) { http.Error(w, "Version not found", http.StatusNotFound) @@ -118,7 +129,7 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { resolvedVersion.Stability = version.Stability w.Header().Set(versionware.HeaderSnykVersionServed, resolvedVersion.String()) - content, err := h.store.Version(r.Context(), resolvedVersion.String()) + content, err := h.store.Version(ctx, resolvedVersion.String()) if err != nil { logError(err) http.Error(w, "Failure to retrieve version", http.StatusInternalServerError) diff --git a/vervet-underground/internal/handler/handler_test.go b/vervet-underground/internal/handler/handler_test.go index a7dd0852..a77be644 100644 --- a/vervet-underground/internal/handler/handler_test.go +++ b/vervet-underground/internal/handler/handler_test.go @@ -132,7 +132,7 @@ func (s *mockStorage) NotifyVersion(ctx context.Context, name string, version st return nil } -func (s *mockStorage) VersionIndex() vervet.VersionIndex { +func (s *mockStorage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { return vervet.NewVersionIndex(vervet.VersionSlice{ vervet.MustParseVersion("2021-06-04~experimental"), vervet.MustParseVersion("2021-10-20~experimental"), @@ -140,7 +140,7 @@ func (s *mockStorage) VersionIndex() vervet.VersionIndex { vervet.MustParseVersion("2022-01-16~experimental"), vervet.MustParseVersion("2022-01-16~beta"), vervet.MustParseVersion("2022-01-16~ga"), - }) + }), nil } func (s *mockStorage) Version(ctx context.Context, version string) ([]byte, error) { diff --git a/vervet-underground/internal/scraper/gcs_scraper_test.go b/vervet-underground/internal/scraper/gcs_scraper_test.go index 2930a821..ce6e92e9 100644 --- a/vervet-underground/internal/scraper/gcs_scraper_test.go +++ b/vervet-underground/internal/scraper/gcs_scraper_test.go @@ -66,7 +66,8 @@ func TestGCSScraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) @@ -129,7 +130,8 @@ func TestGCSScraperCollation(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) diff --git a/vervet-underground/internal/scraper/s3_scraper_test.go b/vervet-underground/internal/scraper/s3_scraper_test.go index f9a75a13..53623f03 100644 --- a/vervet-underground/internal/scraper/s3_scraper_test.go +++ b/vervet-underground/internal/scraper/s3_scraper_test.go @@ -62,7 +62,8 @@ func TestS3Scraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) @@ -121,7 +122,8 @@ func TestS3ScraperCollation(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) diff --git a/vervet-underground/internal/scraper/scraper_test.go b/vervet-underground/internal/scraper/scraper_test.go index 6fdcb53d..c25be756 100644 --- a/vervet-underground/internal/scraper/scraper_test.go +++ b/vervet-underground/internal/scraper/scraper_test.go @@ -131,7 +131,8 @@ func TestScraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go index 69bf9ec3..6b93b79f 100644 --- a/vervet-underground/internal/storage/disk/disk.go +++ b/vervet-underground/internal/storage/disk/disk.go @@ -13,7 +13,6 @@ import ( "path" "path/filepath" "strings" - "sync" "time" "github.com/getkin/kin-openapi/openapi3" @@ -23,10 +22,8 @@ import ( ) type Storage struct { - path string - mu sync.RWMutex - collatedVersions vervet.VersionIndex - newCollator func() (*storage.Collator, error) + path string + newCollator func() (*storage.Collator, error) } // Option defines a Storage constructor option. @@ -118,15 +115,11 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - versions, specs, err := aggregate.Collate() + _, specs, err := aggregate.Collate() if err != nil { return err } - s.mu.Lock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.mu.Unlock() - return s.putCollatedSpecs(specs) } @@ -174,11 +167,24 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string } // VersionIndex implements scraper.Storage. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + objects, err := s.ListObjects(ctx, storage.ServiceVersionsFolder) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(objects)) + for idx, obj := range objects { + _, versionStr, _, err := parseServiceVersionRevisionKey(obj) + if err != nil { + return vervet.VersionIndex{}, err + } + vs[idx], err = vervet.ParseVersion(versionStr) + if err != nil { + return vervet.VersionIndex{}, err + } + } - return s.collatedVersions + return vervet.NewVersionIndex(vs), nil } // Version implements scraper.Storage. @@ -190,9 +196,11 @@ func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { blob, err := s.GetCollatedVersionSpec(version) if err != nil { - s.mu.RLock() - resolved, err := s.collatedVersions.Resolve(parsedVersion) - s.mu.RUnlock() + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) if err != nil { return nil, err } diff --git a/vervet-underground/internal/storage/gcs/client.go b/vervet-underground/internal/storage/gcs/client.go index e6b53c82..c36ae773 100644 --- a/vervet-underground/internal/storage/gcs/client.go +++ b/vervet-underground/internal/storage/gcs/client.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "cloud.google.com/go/storage" @@ -43,9 +42,6 @@ type Storage struct { c *storage.Client config Config newCollator func() (*vustorage.Collator, error) - - mu sync.RWMutex - collatedVersions vervet.VersionIndex } // New instantiates a gcs.Storage client to handle storing and retrieving @@ -160,15 +156,11 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - versions, specs, err := aggregate.Collate() + _, specs, err := aggregate.Collate() if err != nil { return err } - s.mu.Lock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.mu.Unlock() - n, err := s.putCollatedSpecs(ctx, specs) if err != nil { return err @@ -231,11 +223,19 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string } // Versions lists all available Collated Versions. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + versions, err := s.ListCollatedVersions(ctx) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(versions)) + for idx, version := range versions { + vs[idx], err = vervet.ParseVersion(version) + if err != nil { + return vervet.VersionIndex{}, err + } + } + return vervet.NewVersionIndex(vs), nil } // Version implements scraper.Storage. @@ -247,9 +247,11 @@ func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { blob, err := s.GetCollatedVersionSpec(ctx, version) if err != nil { - s.mu.RLock() - resolved, err := s.collatedVersions.Resolve(parsedVersion) - s.mu.RUnlock() + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) if err != nil { return nil, err } diff --git a/vervet-underground/internal/storage/s3/client.go b/vervet-underground/internal/storage/s3/client.go index 226e2d70..01d04652 100644 --- a/vervet-underground/internal/storage/s3/client.go +++ b/vervet-underground/internal/storage/s3/client.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -47,9 +46,6 @@ type Storage struct { client *s3.Client config Config newCollator func() (*storage.Collator, error) - - mu sync.RWMutex - collatedVersions vervet.VersionIndex } func New(ctx context.Context, awsCfg *Config, options ...Option) (storage.Storage, error) { @@ -187,11 +183,19 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string } // Versions implements scraper.Storage. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + versions, err := s.ListCollatedVersions(ctx) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(versions)) + for idx, version := range versions { + vs[idx], err = vervet.ParseVersion(version) + if err != nil { + return vervet.VersionIndex{}, err + } + } + return vervet.NewVersionIndex(vs), nil } // Version implements scraper.Storage. @@ -203,9 +207,11 @@ func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { blob, err := s.GetCollatedVersionSpec(ctx, version) if err != nil { - s.mu.RLock() - resolved, err := s.collatedVersions.Resolve(parsedVersion) - s.mu.RUnlock() + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) if err != nil { return nil, err } @@ -263,15 +269,11 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - versions, specs, err := aggregate.Collate() + _, specs, err := aggregate.Collate() if err != nil { return err } - s.mu.Lock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.mu.Unlock() - n, err := s.putCollatedSpecs(ctx, specs) if err != nil { return err diff --git a/vervet-underground/internal/storage/storage.go b/vervet-underground/internal/storage/storage.go index b3bf4d95..8311d049 100644 --- a/vervet-underground/internal/storage/storage.go +++ b/vervet-underground/internal/storage/storage.go @@ -20,7 +20,7 @@ type ReadOnlyStorage interface { HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) // VersionIndex fetches the Storage Versions index compiled by VU - VersionIndex() vervet.VersionIndex + VersionIndex(ctx context.Context) (vervet.VersionIndex, error) // Version fetches the Storage Version spec compiled by VU Version(ctx context.Context, version string) ([]byte, error) From e0120cfe6638d10751a6739d6dc1f74d7ef9a1c2 Mon Sep 17 00:00:00 2001 From: John Gresty Date: Thu, 25 Apr 2024 00:31:34 +0100 Subject: [PATCH 7/7] chore: remove returned versions from collator.Collate Nothing was using the returned value so there is no point returning it. Makes the surface area a bit smaller so it is easier to reason about. --- vervet-underground/internal/storage/collator.go | 10 +++++----- .../internal/storage/collator_test.go | 17 +++++------------ .../internal/storage/disk/disk.go | 2 +- .../internal/storage/gcs/client.go | 2 +- .../internal/storage/s3/client.go | 2 +- 5 files changed, 13 insertions(+), 20 deletions(-) diff --git a/vervet-underground/internal/storage/collator.go b/vervet-underground/internal/storage/collator.go index 1383a60d..a53f80dd 100644 --- a/vervet-underground/internal/storage/collator.go +++ b/vervet-underground/internal/storage/collator.go @@ -98,7 +98,7 @@ func (c *Collator) Add(service string, revision ContentRevision) { } // Collate processes added service revisions to collate unified versions and OpenAPI specs for each version. -func (c *Collator) Collate() (vervet.VersionSlice, map[vervet.Version]openapi3.T, error) { +func (c *Collator) Collate() (map[vervet.Version]openapi3.T, error) { specs := make(map[vervet.Version]openapi3.T) sort.Sort(c.uniqueVersions) @@ -120,23 +120,23 @@ func (c *Collator) Collate() (vervet.VersionSlice, map[vervet.Version]openapi3.T if err != nil { log.Error().Err(err).Msgf("could not merge revision for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } if err := vervet.RemoveElements(spec, c.excludePatterns); err != nil { log.Error().Err(err).Msgf("could not merge revision for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } if err := c.applyOverlay(spec); err != nil { log.Error().Err(err).Msgf("failed to merge overlay for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } specs[version] = *spec } } - return c.uniqueVersions, specs, nil + return specs, nil } func mergeRevisions(revisions ContentRevisions) (*openapi3.T, error) { diff --git a/vervet-underground/internal/storage/collator_test.go b/vervet-underground/internal/storage/collator_test.go index 0c8c5025..7c0afa05 100644 --- a/vervet-underground/internal/storage/collator_test.go +++ b/vervet-underground/internal/storage/collator_test.go @@ -137,12 +137,8 @@ func TestCollator_Collate(t *testing.T) { Blob: []byte(serviceBSpec), }) - versions, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) - c.Assert(len(versions), qt.Equals, 3) - c.Assert(versions[0], qt.Equals, v20220201_beta) - c.Assert(versions[1], qt.Equals, v20220301_ga) - c.Assert(versions[2], qt.Equals, v20220401_ga) c.Assert(specs[v20220201_beta].Paths.Find("/test"), qt.IsNotNil) c.Assert(specs[v20220201_beta].Paths.Find("/example"), qt.IsNil) @@ -181,11 +177,8 @@ func TestCollator_Collate_MigratingEndpoints(t *testing.T) { Blob: []byte(serviceCSpec), }) - versions, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) - c.Assert(len(versions), qt.Equals, 2) - c.Assert(versions[0], qt.Equals, v20220201_exp) - c.Assert(versions[1], qt.Equals, v20230314_exp) c.Assert(specs[v20220201_exp].Paths.Find("/test"), qt.IsNotNil) c.Assert(specs[v20230314_exp].Paths.Find("/test"), qt.IsNotNil) @@ -222,7 +215,7 @@ func TestCollator_Collate_ExcludePatterns(t *testing.T) { Version: v20220401_ga, Blob: []byte(serviceBSpec), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) c.Assert(specs[v20220401_ga].Paths["/example"].Post.Extensions["x-other-internal"], qt.IsNil) @@ -256,7 +249,7 @@ func TestCollator_Collate_Conflict(t *testing.T) { Timestamp: time.Date(2021, 6, 15, 0, 0, 0, 0, time.UTC), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) // First path wins c.Assert(specs[vervet.MustParseVersion("2021-06-15")].Paths["/examples/hello-world"].Post.Description, @@ -304,7 +297,7 @@ func TestCollator_Collate_Overlay(t *testing.T) { Version: v20220401_ga, Blob: []byte(serviceBSpec), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) c.Assert(specs[v20220401_ga].Servers[0].URL, qt.Equals, "https://awesome.snyk.io/rest") diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go index 6b93b79f..a5be5623 100644 --- a/vervet-underground/internal/storage/disk/disk.go +++ b/vervet-underground/internal/storage/disk/disk.go @@ -115,7 +115,7 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - _, specs, err := aggregate.Collate() + specs, err := aggregate.Collate() if err != nil { return err } diff --git a/vervet-underground/internal/storage/gcs/client.go b/vervet-underground/internal/storage/gcs/client.go index c36ae773..1be2b4f0 100644 --- a/vervet-underground/internal/storage/gcs/client.go +++ b/vervet-underground/internal/storage/gcs/client.go @@ -156,7 +156,7 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - _, specs, err := aggregate.Collate() + specs, err := aggregate.Collate() if err != nil { return err } diff --git a/vervet-underground/internal/storage/s3/client.go b/vervet-underground/internal/storage/s3/client.go index 01d04652..04903ade 100644 --- a/vervet-underground/internal/storage/s3/client.go +++ b/vervet-underground/internal/storage/s3/client.go @@ -269,7 +269,7 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - _, specs, err := aggregate.Collate() + specs, err := aggregate.Collate() if err != nil { return err }