diff --git a/vervet-underground/Dockerfile b/vervet-underground/Dockerfile index 786ab974..74f75312 100644 --- a/vervet-underground/Dockerfile +++ b/vervet-underground/Dockerfile @@ -18,7 +18,10 @@ RUN go mod download COPY . . RUN --mount=type=cache,target=/root/.cache/go-build \ CGO_ENABLED=0 \ - go build -v -o /go/bin/app ./ + go build -v -o /go/bin/app ./cmd/api/main.go +RUN --mount=type=cache,target=/root/.cache/go-build \ + CGO_ENABLED=0 \ + go build -v -o /go/bin/scraper ./cmd/scraper/main.go ################# # Runtime stage # diff --git a/vervet-underground/Makefile b/vervet-underground/Makefile index 0fccfecd..0f5975db 100644 --- a/vervet-underground/Makefile +++ b/vervet-underground/Makefile @@ -11,7 +11,7 @@ fmt: .PHONY: clean clean: - rm -f server && go clean + rm -f api scraper && go clean .PHONY: lint lint: @@ -36,8 +36,13 @@ test: go test ./... -count=1 -race .PHONY: build -build: - go build server.go +build: api scraper + +api: + go build -o api cmd/api/main.go + +scraper: + go build -o scraper cmd/scraper/main.go #---------------------------------------------------------------------------------- # Check for updates to packages in remote OSS repositories and update go.mod AND # go.sum to match changes. Then download the all the dependencies @@ -57,10 +62,11 @@ test-coverage: .PHONY: start start: - go run server.go + go run cmd/scraper/main.go + go run cmd/api/main.go .PHONY: install-tools -install-tools: +install-tools: ifndef CI mkdir -p ${GO_BIN} curl -sSfL 'https://raw.githubusercontent.com/golangci/golangci-lint/${GOCI_LINT_V}/install.sh' | sh -s -- -b ${GO_BIN} ${GOCI_LINT_V} diff --git a/vervet-underground/cmd/api/main.go b/vervet-underground/cmd/api/main.go new file mode 100644 index 00000000..509e4690 --- /dev/null +++ b/vervet-underground/cmd/api/main.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" + + "vervet-underground/config" + "vervet-underground/internal/handler" + "vervet-underground/internal/storage" + "vervet-underground/internal/storage/disk" + "vervet-underground/internal/storage/gcs" + "vervet-underground/internal/storage/s3" +) + +func main() { + var wait time.Duration + var configJson string + flag.DurationVar(&wait, "graceful-timeout", time.Second*15, + "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m") + flag.StringVar(&configJson, "config-file", "config.default.json", + "the configuration file holding target services and the host address to run server on") + + flag.Parse() + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + zerolog.SetGlobalLevel(zerolog.DebugLevel) + + var cfg *config.ServerConfig + var err error + if cfg, err = config.Load(configJson); err != nil { + log.Fatal().Err(err).Msg("unable to load config") + } + + ctx := context.Background() + st, err := initializeStorage(ctx, cfg) + if err != nil { + log.Fatal().Err(err).Msg("unable to initialize storage client") + } + + h := handler.New(cfg, st, handler.UseDefaultMiddleware) + + srv := &http.Server{ + Addr: fmt.Sprintf("%s:8080", cfg.Host), + // Good practice to set timeouts to avoid Slowloris attacks. + WriteTimeout: time.Second * 15, + ReadTimeout: time.Second * 15, + IdleTimeout: time.Second * 60, + Handler: h, + } + + grp, grpCtx := errgroup.WithContext(ctx) + grp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host)) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("failed to start server: %w", err) + } + return nil + }) + + grp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + interceptSignals(grpCtx) + time.Sleep(wait) + log.Info().Float64("timeoutSeconds", wait.Seconds()).Msg("server stopping") + + return shutdown(ctx, srv, wait) + }) + + if err := grp.Wait(); err != nil { + log.Fatal().Err(err).Msg("server unexpectedly stopped") + } + + log.Info().Msg("shutting down cleanly") +} + +func initializeStorage(ctx context.Context, cfg *config.ServerConfig) (storage.ReadOnlyStorage, error) { + switch cfg.Storage.Type { + case config.StorageTypeDisk: + return disk.New(cfg.Storage.Disk.Path), nil + case config.StorageTypeS3: + return s3.New(ctx, &s3.Config{ + AwsRegion: cfg.Storage.S3.Region, + AwsEndpoint: cfg.Storage.S3.Endpoint, + IamRoleEnabled: cfg.Storage.IamRoleEnabled, + BucketName: cfg.Storage.BucketName, + Credentials: s3.StaticKeyCredentials{ + AccessKey: cfg.Storage.S3.AccessKey, + SecretKey: cfg.Storage.S3.SecretKey, + SessionKey: cfg.Storage.S3.SessionKey, + }, + }) + case config.StorageTypeGCS: + return gcs.New(ctx, &gcs.Config{ + GcsRegion: cfg.Storage.GCS.Region, + GcsEndpoint: cfg.Storage.GCS.Endpoint, + IamRoleEnabled: cfg.Storage.IamRoleEnabled, + BucketName: cfg.Storage.BucketName, + Credentials: gcs.StaticKeyCredentials{ + ProjectId: cfg.Storage.GCS.ProjectId, + Filename: cfg.Storage.GCS.Filename, + }, + }) + } + return nil, fmt.Errorf("unknown storage backend: %s", cfg.Storage.Type) +} + +func interceptSignals(ctx context.Context) { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + select { + case <-ctx.Done(): + case sig := <-sigc: + log.Info().Str("signal", sig.String()).Msg("intercepted signal") + } +} + +func shutdown(ctx context.Context, srv *http.Server, timeout time.Duration) error { + shutdownCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to gracefully shutdown server: %w", err) + } + + return nil +} diff --git a/vervet-underground/server.go b/vervet-underground/cmd/scraper/main.go similarity index 54% rename from vervet-underground/server.go rename to vervet-underground/cmd/scraper/main.go index 7b61d43d..3e79ec01 100644 --- a/vervet-underground/server.go +++ b/vervet-underground/cmd/scraper/main.go @@ -6,31 +6,23 @@ import ( "fmt" "net/http" "os" - "os/signal" "time" - "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "vervet-underground/config" - "vervet-underground/internal/handler" "vervet-underground/internal/scraper" "vervet-underground/internal/storage" + "vervet-underground/internal/storage/disk" "vervet-underground/internal/storage/gcs" - "vervet-underground/internal/storage/mem" "vervet-underground/internal/storage/s3" ) func main() { var wait time.Duration - var scrapeInterval time.Duration var configJson string var overlayFile string - flag.DurationVar(&wait, "graceful-timeout", time.Second*15, - "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m") - flag.DurationVar(&scrapeInterval, "scrape-interval", time.Minute, - "the frequency at which scraping occurs - e.g. 15s, 1m, 1h") flag.StringVar(&configJson, "config-file", "config.default.json", "the configuration file holding target services and the host address to run server on") flag.StringVar(&overlayFile, "overlay-file", "", @@ -43,8 +35,7 @@ func main() { var cfg *config.ServerConfig var err error if cfg, err = config.Load(configJson); err != nil { - logError(err) - log.Fatal().Msg("unable to load config") + log.Fatal().Err(err).Msg("unable to load config") } log.Info().Msgf("services: %s", cfg.Services) @@ -52,18 +43,14 @@ func main() { if overlayFile != "" { overlayContents, err = os.ReadFile(overlayFile) if err != nil { - logError(err) - log.Fatal().Msgf("unable to load overlay file %q", overlayFile) + log.Fatal().Err(err).Msgf("unable to load overlay file %q", overlayFile) } } - // initialize Scraper - ticker := time.NewTicker(scrapeInterval) ctx := context.Background() st, err := initializeStorage(ctx, cfg, overlayContents) if err != nil { - logError(err) - log.Fatal().Msg("unable to initialize storage client") + log.Fatal().Err(err).Msg("unable to initialize storage client") } sc, err := scraper.New(cfg, st, scraper.HTTPClient(&http.Client{ @@ -71,77 +58,13 @@ func main() { Transport: scraper.DurationTransport(http.DefaultTransport), })) if err != nil { - logError(err) - log.Fatal().Msg("unable to load storage") - } - // initialize - err = runScrape(ctx, sc) - if err != nil { - log.Fatal().Err(err).Msg("failed initialization scraping of service") + log.Fatal().Err(err).Msg("unable to load storage") } - h := handler.New(cfg, sc, handler.UseDefaultMiddleware) - - srv := &http.Server{ - Addr: fmt.Sprintf("%s:8080", cfg.Host), - // Good practice to set timeouts to avoid Slowloris attacks. - WriteTimeout: time.Second * 15, - ReadTimeout: time.Second * 15, - IdleTimeout: time.Second * 60, - Handler: h, - } - - quit := make(chan struct{}, 1) - go func() { - for { - select { - case <-ticker.C: - if err := runScrape(ctx, sc); err != nil { - logError(err) - } - case <-quit: - ticker.Stop() - return - } - } - }() - - // Run our server in a goroutine so that it doesn't block. - go func() { - log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host)) - if err := srv.ListenAndServe(); err != nil && errors.Is(err, http.ErrServerClosed) { - logError(err) - os.Exit(1) - } - }() - - c := make(chan os.Signal, 1) - // We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C) - // SIGKILL, SIGQUIT or SIGTERM (Ctrl+/) will not be caught. - signal.Notify(c, os.Interrupt) - // Block until we receive our signal. - <-c - - // closes the scraper go routine - quit <- struct{}{} - close(quit) - log.Info().Msg("scraper successfully spun down") - - // Create a deadline to wait for. - ctx, cancel := context.WithTimeout(context.Background(), wait) - defer cancel() - // Doesn't block if no connections, but will otherwise wait - // until the timeout deadline. - err = srv.Shutdown(ctx) + err = runScrape(ctx, sc) if err != nil { - logError(err) + log.Fatal().Err(err).Msg("failed scraping of service") } - - // Optionally, you could run srv.Shutdown in a goroutine and block on - // <-ctx.Done() if your application should wait for other services - // to finalize based on context cancellation. - log.Info().Msg("shutting down cleanly") - os.Exit(0) } // runScrape runs scraping all services and can take @@ -157,15 +80,6 @@ func runScrape(ctx context.Context, sc *scraper.Scraper) error { return nil } -func logError(err error) { - log. - Error(). - Stack(). - Err(err). - Str("cause", fmt.Sprintf("%+v", errors.Cause(err))). - Msg("UnhandledException") -} - func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayContents []byte) (storage.Storage, error) { collatorOpts := []storage.CollatorOption{storage.CollatorExcludePattern(cfg.Merging.ExcludePatterns)} if overlayContents != nil { @@ -175,8 +89,8 @@ func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayCon return storage.NewCollator(collatorOpts...) } switch cfg.Storage.Type { - case config.StorageTypeMemory: - return mem.New(mem.NewCollator(newCollator)), nil + case config.StorageTypeDisk: + return disk.New(cfg.Storage.Disk.Path, disk.NewCollator(newCollator)), nil case config.StorageTypeS3: return s3.New(ctx, &s3.Config{ AwsRegion: cfg.Storage.S3.Region, diff --git a/vervet-underground/config.default.json b/vervet-underground/config.default.json index 00425a11..1496039e 100644 --- a/vervet-underground/config.default.json +++ b/vervet-underground/config.default.json @@ -8,6 +8,9 @@ ], "host": "0.0.0.0", "storage": { - "type": "memory" + "type": "disk", + "disk": { + "path": "/tmp/specs" + } } } diff --git a/vervet-underground/config/config.go b/vervet-underground/config/config.go index de004cc3..c6a32868 100644 --- a/vervet-underground/config/config.go +++ b/vervet-underground/config/config.go @@ -13,9 +13,9 @@ import ( type StorageType string const ( - StorageTypeMemory StorageType = "memory" - StorageTypeS3 StorageType = "s3" - StorageTypeGCS StorageType = "gcs" + StorageTypeDisk StorageType = "disk" + StorageTypeS3 StorageType = "s3" + StorageTypeGCS StorageType = "gcs" ) // ServerConfig defines the configuration options for the Vervet Underground service. @@ -70,6 +70,12 @@ type StorageConfig struct { IamRoleEnabled bool S3 S3Config GCS GcsConfig + Disk DiskConfig +} + +// DiskConfig defines configuration options for local disk storage. +type DiskConfig struct { + Path string } // S3Config defines configuration options for AWS S3 storage. @@ -92,7 +98,7 @@ type GcsConfig struct { // setDefaults sets default values for the ServerConfig. func setDefaults() { viper.SetDefault("host", "localhost") - viper.SetDefault("storage.type", StorageTypeMemory) + viper.SetDefault("storage.type", StorageTypeDisk) } // loadEnv sets up the config store to load values from environment variables, diff --git a/vervet-underground/config/config_test.go b/vervet-underground/config/config_test.go index b45e8c05..77f33991 100644 --- a/vervet-underground/config/config_test.go +++ b/vervet-underground/config/config_test.go @@ -33,7 +33,7 @@ func TestLoad(t *testing.T) { Host: "localhost", Services: nil, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -52,7 +52,7 @@ func TestLoad(t *testing.T) { Host: "0.0.0.0", Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}}, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -81,7 +81,35 @@ func TestLoad(t *testing.T) { }, }, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, + }, + } + c.Assert(*conf, qt.DeepEquals, expected) + }) + + c.Run("disk config", func(c *qt.C) { + f := createTestFile(c, []byte(`{ + "host": "0.0.0.0", + "services": [{"url":"localhost","name":"localhost"}], + "storage": { + "type": "disk", + "disk": { + "path": "/tmp/foobar" + } + } + }`)) + + conf, err := config.Load(f.Name()) + c.Assert(err, qt.IsNil) + + expected := config.ServerConfig{ + Host: "0.0.0.0", + Services: []config.ServiceConfig{{URL: "localhost", Name: "localhost"}}, + Storage: config.StorageConfig{ + Type: config.StorageTypeDisk, + Disk: config.DiskConfig{ + Path: "/tmp/foobar", + }, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -167,7 +195,7 @@ func TestLoad(t *testing.T) { secretConfig := createTestFile(c, []byte(`{ "services": [{"url":"http://user:password@localhost","name":"localhost"}], "storage": { - "type": "memory" + "type": "disk" } }`)) @@ -178,7 +206,7 @@ func TestLoad(t *testing.T) { Host: "0.0.0.0", Services: []config.ServiceConfig{{URL: "http://user:password@localhost", Name: "localhost"}}, Storage: config.StorageConfig{ - Type: config.StorageTypeMemory, + Type: config.StorageTypeDisk, }, } c.Assert(*conf, qt.DeepEquals, expected) @@ -189,7 +217,7 @@ func TestLoad(t *testing.T) { "host": "0.0.0.0", "services": [{"url":"http://user:password@localhost"}], "storage": { - "type": "memory" + "type": "disk" } }`)) _, err := config.Load(cfg.Name()) @@ -201,7 +229,7 @@ func TestLoad(t *testing.T) { "host": "0.0.0.0", "services": [{"url":"http://service-a","name":"service-a"},{"url":"http://service-a","name":"service-a"}], "storage": { - "type": "memory" + "type": "disk" } }`)) _, err := config.Load(cfg.Name()) diff --git a/vervet-underground/internal/handler/handler.go b/vervet-underground/internal/handler/handler.go index 7999fdbd..0e03d311 100644 --- a/vervet-underground/internal/handler/handler.go +++ b/vervet-underground/internal/handler/handler.go @@ -19,21 +19,21 @@ import ( "github.com/snyk/vervet/v6/versionware" "vervet-underground/config" - "vervet-underground/internal/scraper" + "vervet-underground/internal/storage" ) // Handler handles Vervet Underground HTTP requests. type Handler struct { cfg *config.ServerConfig - sc *scraper.Scraper + store storage.ReadOnlyStorage router chi.Router } // New returns a new Handler. -func New(cfg *config.ServerConfig, sc *scraper.Scraper, routerOptions ...func(r chi.Router)) *Handler { +func New(cfg *config.ServerConfig, store storage.ReadOnlyStorage, routerOptions ...func(r chi.Router)) *Handler { h := &Handler{ cfg: cfg, - sc: sc, + store: store, router: chi.NewRouter(), } for i := range routerOptions { @@ -69,7 +69,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) openapiVersions(w http.ResponseWriter, r *http.Request) { - versionIndex := h.sc.VersionIndex() + versionIndex, err := h.store.VersionIndex(r.Context()) + if err != nil { + logError(err) + http.Error(w, "Cannot get versions", http.StatusInternalServerError) + return + } content, err := json.Marshal(versionIndex.Versions().Strings()) if err != nil { logError(err) @@ -105,7 +110,13 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { } } - versionIndex := h.sc.VersionIndex() + ctx := r.Context() + versionIndex, err := h.store.VersionIndex(ctx) + if err != nil { + logError(err) + http.Error(w, "Cannot get versions", http.StatusInternalServerError) + return + } resolvedVersion, err := versionIndex.Resolve(version) if errors.Is(err, vervet.ErrNoMatchingVersion) { http.Error(w, "Version not found", http.StatusNotFound) @@ -118,7 +129,7 @@ func (h *Handler) openapiVersion(w http.ResponseWriter, r *http.Request) { resolvedVersion.Stability = version.Stability w.Header().Set(versionware.HeaderSnykVersionServed, resolvedVersion.String()) - content, err := h.sc.Version(r.Context(), resolvedVersion.String()) + content, err := h.store.Version(ctx, resolvedVersion.String()) if err != nil { logError(err) http.Error(w, "Failure to retrieve version", http.StatusInternalServerError) diff --git a/vervet-underground/internal/handler/handler_test.go b/vervet-underground/internal/handler/handler_test.go index d63e27ff..a77be644 100644 --- a/vervet-underground/internal/handler/handler_test.go +++ b/vervet-underground/internal/handler/handler_test.go @@ -12,12 +12,11 @@ import ( "vervet-underground/config" "vervet-underground/internal/handler" - "vervet-underground/internal/scraper" ) func TestHealth(t *testing.T) { c := qt.New(t) - cfg, h := setup(c) + cfg, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/", nil) @@ -33,7 +32,7 @@ func TestHealth(t *testing.T) { func TestOpenapi(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() for _, path := range []string{"/openapi", "/openapi/"} { w := httptest.NewRecorder() @@ -55,7 +54,7 @@ func TestOpenapi(t *testing.T) { func TestMetrics(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() // NOTE: Metrics are counted globally, so in order for this metrics test to @@ -78,7 +77,7 @@ func TestMetrics(t *testing.T) { func TestOpenapiVersion(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/openapi/2022-01-16~beta", nil) @@ -91,7 +90,7 @@ func TestOpenapiVersion(t *testing.T) { func TestOpenapiVersionNotFound(t *testing.T) { c := qt.New(t) - _, h := setup(c) + _, h := setup() w := httptest.NewRecorder() req := httptest.NewRequest("GET", "/openapi/2021-01-16~beta", nil) @@ -102,7 +101,7 @@ func TestOpenapiVersionNotFound(t *testing.T) { c.Assert(contents, qt.DeepEquals, []byte("Version not found\n")) } -func setup(c *qt.C) (*config.ServerConfig, *handler.Handler) { +func setup() (*config.ServerConfig, *handler.Handler) { cfg := &config.ServerConfig{ Services: []config.ServiceConfig{{ Name: "petfood", URL: "http://petfood.svc.cluster.local", @@ -111,9 +110,7 @@ func setup(c *qt.C) (*config.ServerConfig, *handler.Handler) { }}, } st := &mockStorage{} - sc, err := scraper.New(cfg, st) - c.Assert(err, qt.IsNil) - h := handler.New(cfg, sc, handler.UseDefaultMiddleware) + h := handler.New(cfg, st, handler.UseDefaultMiddleware) return cfg, h } @@ -135,7 +132,7 @@ func (s *mockStorage) NotifyVersion(ctx context.Context, name string, version st return nil } -func (s *mockStorage) VersionIndex() vervet.VersionIndex { +func (s *mockStorage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { return vervet.NewVersionIndex(vervet.VersionSlice{ vervet.MustParseVersion("2021-06-04~experimental"), vervet.MustParseVersion("2021-10-20~experimental"), @@ -143,7 +140,7 @@ func (s *mockStorage) VersionIndex() vervet.VersionIndex { vervet.MustParseVersion("2022-01-16~experimental"), vervet.MustParseVersion("2022-01-16~beta"), vervet.MustParseVersion("2022-01-16~ga"), - }) + }), nil } func (s *mockStorage) Version(ctx context.Context, version string) ([]byte, error) { diff --git a/vervet-underground/internal/scraper/gcs_scraper_test.go b/vervet-underground/internal/scraper/gcs_scraper_test.go index 2930a821..ce6e92e9 100644 --- a/vervet-underground/internal/scraper/gcs_scraper_test.go +++ b/vervet-underground/internal/scraper/gcs_scraper_test.go @@ -66,7 +66,8 @@ func TestGCSScraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) @@ -129,7 +130,8 @@ func TestGCSScraperCollation(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) diff --git a/vervet-underground/internal/scraper/s3_scraper_test.go b/vervet-underground/internal/scraper/s3_scraper_test.go index f9a75a13..53623f03 100644 --- a/vervet-underground/internal/scraper/s3_scraper_test.go +++ b/vervet-underground/internal/scraper/s3_scraper_test.go @@ -62,7 +62,8 @@ func TestS3Scraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) @@ -121,7 +122,8 @@ func TestS3ScraperCollation(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) diff --git a/vervet-underground/internal/scraper/scraper.go b/vervet-underground/internal/scraper/scraper.go index 845dec1b..2c6fa994 100644 --- a/vervet-underground/internal/scraper/scraper.go +++ b/vervet-underground/internal/scraper/scraper.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" - "github.com/snyk/vervet/v6" "go.uber.org/multierr" "vervet-underground/config" @@ -286,11 +285,3 @@ func isLegacyVersion(version string) bool { // This default version predates vervet's creation date. return version == "2021-01-01" } - -func (s *Scraper) VersionIndex() vervet.VersionIndex { - return s.storage.VersionIndex() -} - -func (s *Scraper) Version(ctx context.Context, version string) ([]byte, error) { - return s.storage.Version(ctx, version) -} diff --git a/vervet-underground/internal/scraper/scraper_test.go b/vervet-underground/internal/scraper/scraper_test.go index 8aab2089..c25be756 100644 --- a/vervet-underground/internal/scraper/scraper_test.go +++ b/vervet-underground/internal/scraper/scraper_test.go @@ -6,20 +6,17 @@ import ( "fmt" "net/http" "net/http/httptest" - "strings" "testing" "time" qt "github.com/frankban/quicktest" "github.com/getkin/kin-openapi/openapi3" "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" "vervet-underground/config" "vervet-underground/internal/scraper" - "vervet-underground/internal/storage/mem" - "vervet-underground/internal/testutil" + "vervet-underground/internal/storage/disk" ) var ( @@ -99,7 +96,16 @@ func TestScraper(t *testing.T) { Name: "animals", URL: animalsService.URL, }}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) + sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -125,7 +131,8 @@ func TestScraper(t *testing.T) { c.Assert(ok, qt.IsTrue) } - vi := st.VersionIndex() + vi, err := st.VersionIndex(ctx) + c.Assert(err, qt.IsNil) c.Assert(len(vi.Versions()), qt.Equals, 4) for _, version := range vi.Versions() { specData, err := st.Version(ctx, version.String()) @@ -155,7 +162,15 @@ func TestScraperWithLegacy(t *testing.T) { }, }, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -180,7 +195,15 @@ func TestEmptyScrape(t *testing.T) { cfg := &config.ServerConfig{ Services: nil, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) c.Assert(err, qt.IsNil) @@ -198,7 +221,15 @@ func TestScrapeClientError(t *testing.T) { cfg := &config.ServerConfig{ Services: []config.ServiceConfig{{Name: "nope", URL: "http://example.com/nope"}}, } - st := mem.New() + st := disk.New("/tmp/specs") + c.Cleanup(func() { + ds, ok := st.(*disk.Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 }), scraper.HTTPClient(&http.Client{ @@ -221,84 +252,3 @@ type errorTransport struct{} func (*errorTransport) RoundTrip(*http.Request) (*http.Response, error) { return nil, fmt.Errorf("bad wolf") } - -func TestScraperCollation(t *testing.T) { - c := qt.New(t) - - petfoodService, animalsService := setupHttpServers(c) - tests := []struct { - name, version, digest string - }{{ - "petfood", "2021-09-01", "sha256:I20cAQ3VEjDrY7O0B678yq+0pYN2h3sxQy7vmdlo4+w=", - }, { - "animals", "2021-10-16", "sha256:P1FEFvnhtxJSqXr/p6fMNKE+HYwN6iwKccBGHIVZbyg=", - }} - - cfg := &config.ServerConfig{ - Services: []config.ServiceConfig{{ - Name: "petfood", URL: petfoodService.URL, - }, { - Name: "animals", URL: animalsService.URL, - }}, - } - memSt := mem.New() - st, ok := memSt.(*mem.Storage) - c.Assert(ok, qt.IsTrue) - sc, err := scraper.New(cfg, st, scraper.Clock(func() time.Time { return t0 })) - c.Assert(err, qt.IsNil) - - before, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - // Cancel the scrape context after a timeout so we don't hang the test - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) - c.Cleanup(cancel) - - // Run the scrape - err = sc.Run(ctx) - c.Assert(err, qt.IsNil) - - // Version digests now known to storage - for _, test := range tests { - ok, err := st.HasVersion(ctx, test.name, test.version, test.digest) - c.Assert(err, qt.IsNil) - c.Assert(ok, qt.IsTrue) - } - - collated, err := st.GetCollatedVersionSpecs() - c.Assert(err, qt.IsNil) - c.Assert(len(collated), qt.Equals, 4) - - vi := st.VersionIndex() - c.Assert(len(vi.Versions()), qt.Equals, 4) - for _, version := range vi.Versions() { - specData, err := st.Version(ctx, version.String()) - c.Assert(err, qt.IsNil) - l := openapi3.NewLoader() - spec, err := l.LoadFromData(specData) - c.Assert(err, qt.IsNil) - c.Assert(spec, qt.IsNotNil) - c.Assert(len(spec.Paths), qt.Equals, collatedPaths[version.String()]) - } - - // Assert metrics - after, err := prometheus.DefaultGatherer.Gather() - c.Assert(err, qt.IsNil) - - c.Assert(testutil.SampleDelta("vu_scraper_run_duration_seconds", map[string]string{}, before, after), - qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_run_error_total", map[string]string{}, before, after), - qt.Equals, uint64(0)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(petfoodService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) - c.Assert(testutil.SampleDelta("vu_scraper_service_scrape_duration_seconds", - map[string]string{ - "service": strings.Replace(animalsService.URL, "http://", "", 1), - }, - before, after, - ), qt.Equals, uint64(1)) -} diff --git a/vervet-underground/internal/storage/collator.go b/vervet-underground/internal/storage/collator.go index 1383a60d..a53f80dd 100644 --- a/vervet-underground/internal/storage/collator.go +++ b/vervet-underground/internal/storage/collator.go @@ -98,7 +98,7 @@ func (c *Collator) Add(service string, revision ContentRevision) { } // Collate processes added service revisions to collate unified versions and OpenAPI specs for each version. -func (c *Collator) Collate() (vervet.VersionSlice, map[vervet.Version]openapi3.T, error) { +func (c *Collator) Collate() (map[vervet.Version]openapi3.T, error) { specs := make(map[vervet.Version]openapi3.T) sort.Sort(c.uniqueVersions) @@ -120,23 +120,23 @@ func (c *Collator) Collate() (vervet.VersionSlice, map[vervet.Version]openapi3.T if err != nil { log.Error().Err(err).Msgf("could not merge revision for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } if err := vervet.RemoveElements(spec, c.excludePatterns); err != nil { log.Error().Err(err).Msgf("could not merge revision for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } if err := c.applyOverlay(spec); err != nil { log.Error().Err(err).Msgf("failed to merge overlay for version %s", version) collatorMergeError.WithLabelValues(version.String()).Inc() - return nil, nil, err + return nil, err } specs[version] = *spec } } - return c.uniqueVersions, specs, nil + return specs, nil } func mergeRevisions(revisions ContentRevisions) (*openapi3.T, error) { diff --git a/vervet-underground/internal/storage/collator_test.go b/vervet-underground/internal/storage/collator_test.go index 0c8c5025..7c0afa05 100644 --- a/vervet-underground/internal/storage/collator_test.go +++ b/vervet-underground/internal/storage/collator_test.go @@ -137,12 +137,8 @@ func TestCollator_Collate(t *testing.T) { Blob: []byte(serviceBSpec), }) - versions, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) - c.Assert(len(versions), qt.Equals, 3) - c.Assert(versions[0], qt.Equals, v20220201_beta) - c.Assert(versions[1], qt.Equals, v20220301_ga) - c.Assert(versions[2], qt.Equals, v20220401_ga) c.Assert(specs[v20220201_beta].Paths.Find("/test"), qt.IsNotNil) c.Assert(specs[v20220201_beta].Paths.Find("/example"), qt.IsNil) @@ -181,11 +177,8 @@ func TestCollator_Collate_MigratingEndpoints(t *testing.T) { Blob: []byte(serviceCSpec), }) - versions, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) - c.Assert(len(versions), qt.Equals, 2) - c.Assert(versions[0], qt.Equals, v20220201_exp) - c.Assert(versions[1], qt.Equals, v20230314_exp) c.Assert(specs[v20220201_exp].Paths.Find("/test"), qt.IsNotNil) c.Assert(specs[v20230314_exp].Paths.Find("/test"), qt.IsNotNil) @@ -222,7 +215,7 @@ func TestCollator_Collate_ExcludePatterns(t *testing.T) { Version: v20220401_ga, Blob: []byte(serviceBSpec), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) c.Assert(specs[v20220401_ga].Paths["/example"].Post.Extensions["x-other-internal"], qt.IsNil) @@ -256,7 +249,7 @@ func TestCollator_Collate_Conflict(t *testing.T) { Timestamp: time.Date(2021, 6, 15, 0, 0, 0, 0, time.UTC), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) // First path wins c.Assert(specs[vervet.MustParseVersion("2021-06-15")].Paths["/examples/hello-world"].Post.Description, @@ -304,7 +297,7 @@ func TestCollator_Collate_Overlay(t *testing.T) { Version: v20220401_ga, Blob: []byte(serviceBSpec), }) - _, specs, err := collator.Collate() + specs, err := collator.Collate() c.Assert(err, qt.IsNil) c.Assert(specs[v20220401_ga].Servers[0].URL, qt.Equals, "https://awesome.snyk.io/rest") diff --git a/vervet-underground/internal/storage/disk/disk.go b/vervet-underground/internal/storage/disk/disk.go new file mode 100644 index 00000000..a5be5623 --- /dev/null +++ b/vervet-underground/internal/storage/disk/disk.go @@ -0,0 +1,308 @@ +// Package disk provides an implementation of the storage used in Vervet +// Underground that uses a local filesystem. It's not intended for production +// use, but as a functionally complete reference implementation that can be +// used to validate the other parts of the VU system. +package disk + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/getkin/kin-openapi/openapi3" + "github.com/snyk/vervet/v6" + + "vervet-underground/internal/storage" +) + +type Storage struct { + path string + newCollator func() (*storage.Collator, error) +} + +// Option defines a Storage constructor option. +type Option func(*Storage) + +type objectMeta struct { + blob []byte + lastMod time.Time +} + +func New(path string, options ...Option) storage.Storage { + s := &Storage{ + path: path, + newCollator: func() (*storage.Collator, error) { return storage.NewCollator() }, + } + for _, option := range options { + option(s) + } + return s +} + +func (s *Storage) Cleanup() error { + if s.path == "" { + return fmt.Errorf("not cleaning up invalid path") + } + return os.RemoveAll(s.path) +} + +// NewCollator configures the Storage instance to use the given constructor +// function for creating collator instances. +func NewCollator(newCollator func() (*storage.Collator, error)) Option { + return func(s *Storage) { + s.newCollator = newCollator + } +} + +// NotifyVersions implements scraper.Storage. +func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error { + for _, version := range versions { + // TODO: Add method to fetch contents here + // TODO: implement notify versions; update sunset when versions are removed + err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime) + if err != nil { + return err + } + } + return nil +} + +// CollateVersions aggregates versions and revisions from all the services, and +// produces unified versions and merged specs for all APIs. +func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string]bool) error { + // create an aggregate to process collated data from storage data + aggregate, err := s.newCollator() + if err != nil { + return err + } + serviceRevisions, err := s.ListObjects(ctx, storage.ServiceVersionsFolder) + if err != nil { + return err + } + + // all specs are stored as: "service-versions/{service_name}/{version}/{digest}.json" + for _, revKey := range serviceRevisions { + service, version, digest, err := parseServiceVersionRevisionKey(revKey) + if err != nil { + return err + } + if _, ok := serviceFilter[service]; !ok { + continue + } + rev, err := s.GetObjectWithMetadata(revKey) + if err != nil { + return err + } + + // Assuming version is valid in path uploads + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + revision := storage.ContentRevision{ + Service: service, + Version: parsedVersion, + Timestamp: rev.lastMod, + Digest: storage.Digest(digest), + Blob: rev.blob, + } + aggregate.Add(service, revision) + } + specs, err := aggregate.Collate() + if err != nil { + return err + } + + return s.putCollatedSpecs(specs) +} + +// HasVersion implements scraper.Storage. +func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) { + key := s.getServiceVersionRevisionKey(name, version, digest) + path := path.Join(s.path, key) + _, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} + +// NotifyVersion implements scraper.Storage. +func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error { + digest := storage.NewDigest(contents) + key := s.getServiceVersionRevisionKey(name, version, string(digest)) + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return err + } + + currentRevision := storage.ContentRevision{ + Service: name, + Timestamp: scrapeTime, + Digest: digest, + Blob: contents, + Version: parsedVersion, + } + + _, err = s.GetObject(key) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // Since the digest doesn't exist, add the whole key path + return s.PutObject(key, currentRevision.Blob, ¤tRevision.Timestamp) + } + return err + } + // digest already exists, nothing to do + return nil +} + +// VersionIndex implements scraper.Storage. +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + objects, err := s.ListObjects(ctx, storage.ServiceVersionsFolder) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(objects)) + for idx, obj := range objects { + _, versionStr, _, err := parseServiceVersionRevisionKey(obj) + if err != nil { + return vervet.VersionIndex{}, err + } + vs[idx], err = vervet.ParseVersion(versionStr) + if err != nil { + return vervet.VersionIndex{}, err + } + } + + return vervet.NewVersionIndex(vs), nil +} + +// Version implements scraper.Storage. +func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { + parsedVersion, err := vervet.ParseVersion(version) + if err != nil { + return nil, err + } + + blob, err := s.GetCollatedVersionSpec(version) + if err != nil { + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) + if err != nil { + return nil, err + } + return s.GetCollatedVersionSpec(resolved.String()) + } + return blob, nil +} + +func (s *Storage) getServiceVersionRevisionKey(name string, version string, digest string) string { + // digest could contain slashes + b64 := base64.StdEncoding.EncodeToString([]byte(digest)) + return path.Join(storage.ServiceVersionsFolder, name, version, b64) + ".json" +} + +func (s *Storage) GetObject(key string) ([]byte, error) { + path := path.Join(s.path, key) + return os.ReadFile(path) +} + +func (s *Storage) PutObject(key string, body []byte, timestamp *time.Time) error { + path := path.Join(s.path, key) + err := os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return err + } + err = os.WriteFile(path, body, 0600) + if err != nil { + return err + } + if timestamp == nil { + return nil + } + // Preserve specified timestamp, mostly for testing using a monotonic clock + // on fast file systems. + return os.Chtimes(path, *timestamp, *timestamp) +} + +// GetCollatedVersionSpec retrieves a single collated vervet.Version +// and returns the JSON blob. +func (s *Storage) GetCollatedVersionSpec(version string) ([]byte, error) { + path := path.Join(storage.CollatedVersionsFolder, version, "spec.json") + return s.GetObject(path) +} + +// ListObjects gets all objects under a given directory. +func (s *Storage) ListObjects(ctx context.Context, key string) ([]string, error) { + path := path.Join(s.path, key) + objects := make([]string, 0) + err := filepath.Walk(path, func(obj string, info os.FileInfo, err error) error { + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if !info.IsDir() { + objects = append(objects, obj) + } + return nil + }) + return objects, err +} + +func parseServiceVersionRevisionKey(key string) (string, string, string, error) { + digestB64 := filepath.Base(key) + digestB64 = strings.TrimSuffix(digestB64, ".json") + digest, err := base64.StdEncoding.DecodeString(digestB64) + if err != nil { + return "", "", "", err + } + rest := filepath.Dir(key) + version := filepath.Base(rest) + rest = filepath.Dir(rest) + service := filepath.Base(rest) + + return service, version, string(digest), nil +} + +func (s *Storage) GetObjectWithMetadata(key string) (*objectMeta, error) { + info, err := os.Stat(key) + if err != nil { + return nil, err + } + lastMod := info.ModTime() + body, err := os.ReadFile(key) + return &objectMeta{ + lastMod: lastMod, + blob: body, + }, err +} + +// putCollatedSpecs stores the given collated OpenAPI document objects. +func (s *Storage) putCollatedSpecs(objects map[vervet.Version]openapi3.T) error { + for key, file := range objects { + jsonBlob, err := file.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal json for collation upload: %w", err) + } + err = s.PutObject(storage.CollatedVersionsFolder+key.String()+"/spec.json", jsonBlob, nil) + if err != nil { + return err + } + } + return nil +} diff --git a/vervet-underground/internal/storage/mem/mem_test.go b/vervet-underground/internal/storage/disk/disk_test.go similarity index 81% rename from vervet-underground/internal/storage/mem/mem_test.go rename to vervet-underground/internal/storage/disk/disk_test.go index 0ee88dec..b3ea41b0 100644 --- a/vervet-underground/internal/storage/mem/mem_test.go +++ b/vervet-underground/internal/storage/disk/disk_test.go @@ -1,4 +1,4 @@ -package mem +package disk import ( "context" @@ -15,7 +15,15 @@ var t0 = time.Date(2021, time.December, 3, 20, 49, 51, 0, time.UTC) func TestNotifyVersions(t *testing.T) { c := qt.New(t) - s := New() + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) ctx := context.Background() err := s.NotifyVersions(ctx, "petfood", []string{"2021-09-01", "2021-09-16"}, t0) c.Assert(err, qt.IsNil) @@ -24,7 +32,15 @@ func TestNotifyVersions(t *testing.T) { func TestHasVersion(t *testing.T) { c := qt.New(t) - s := New() + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) ctx := context.Background() const cricketsDigest = "sha256:mWpHX0/hIZS9mVd8eobfHWm6OkUsKZLiqd6ShRnNzA4=" const geckosDigest = "sha256:c5JD7m0g4DVhoaX4z8HFcTP8S/yUOEsjgP8ECkuEHqM=" @@ -65,7 +81,15 @@ const emptySpec = `{"components":{},"info":{"title":"","version":""},"openapi":" func TestCollateVersions(t *testing.T) { c := qt.New(t) - s := New() + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) ctx := context.Background() err := s.NotifyVersion(ctx, "petfood", "2021-09-16", []byte(emptySpec), t0) @@ -92,9 +116,17 @@ func TestCollateVersions(t *testing.T) { c.Assert(string(after), qt.Equals, spec) } -func TestMemStorageCollateVersion(t *testing.T) { - s := New() +func TestDiskStorageCollateVersion(t *testing.T) { c := qt.New(t) + s := New("/tmp/specs") + c.Cleanup(func() { + ds, ok := s.(*Storage) + if !ok { + return + } + err := ds.Cleanup() + c.Assert(err, qt.IsNil) + }) storage.AssertCollateVersion(c, s) } diff --git a/vervet-underground/internal/storage/gcs/client.go b/vervet-underground/internal/storage/gcs/client.go index e6b53c82..1be2b4f0 100644 --- a/vervet-underground/internal/storage/gcs/client.go +++ b/vervet-underground/internal/storage/gcs/client.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "cloud.google.com/go/storage" @@ -43,9 +42,6 @@ type Storage struct { c *storage.Client config Config newCollator func() (*vustorage.Collator, error) - - mu sync.RWMutex - collatedVersions vervet.VersionIndex } // New instantiates a gcs.Storage client to handle storing and retrieving @@ -160,15 +156,11 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - versions, specs, err := aggregate.Collate() + specs, err := aggregate.Collate() if err != nil { return err } - s.mu.Lock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.mu.Unlock() - n, err := s.putCollatedSpecs(ctx, specs) if err != nil { return err @@ -231,11 +223,19 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string } // Versions lists all available Collated Versions. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + versions, err := s.ListCollatedVersions(ctx) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(versions)) + for idx, version := range versions { + vs[idx], err = vervet.ParseVersion(version) + if err != nil { + return vervet.VersionIndex{}, err + } + } + return vervet.NewVersionIndex(vs), nil } // Version implements scraper.Storage. @@ -247,9 +247,11 @@ func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { blob, err := s.GetCollatedVersionSpec(ctx, version) if err != nil { - s.mu.RLock() - resolved, err := s.collatedVersions.Resolve(parsedVersion) - s.mu.RUnlock() + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) if err != nil { return nil, err } diff --git a/vervet-underground/internal/storage/mem/mem.go b/vervet-underground/internal/storage/mem/mem.go deleted file mode 100644 index f44ea5f3..00000000 --- a/vervet-underground/internal/storage/mem/mem.go +++ /dev/null @@ -1,223 +0,0 @@ -// Package mem provides an in-memory implementation of the storage used in -// Vervet Underground. It's not intended for production use, but as a -// functionally complete reference implementation that can be used to validate -// the other parts of the VU system. -package mem - -import ( - "context" - "sort" - "sync" - "time" - - "github.com/rs/zerolog/log" - "github.com/snyk/vervet/v6" - - "vervet-underground/internal/storage" -) - -// versionedResourceMap map [service-name] Vervet Version slice array. -type versionedResourceMap map[string]vervet.VersionSlice - -// mappedRevisionSpecs map [Sha digest of contents string] --> spec contents and metadata. -type mappedRevisionSpecs map[storage.Digest]storage.ContentRevision - -// versionMappedRevisionSpecs map[version-name][digest] --> spec contents and metadata. -type versionMappedRevisionSpecs map[string]mappedRevisionSpecs - -// serviceVersionMappedRevisionSpecs map[service-name][version-name][digest] --> spec contents and metadata. -type serviceVersionMappedRevisionSpecs map[string]versionMappedRevisionSpecs - -// Storage provides an in-memory implementation of Vervet Underground storage. -type Storage struct { - mu sync.RWMutex - - serviceVersions versionedResourceMap - serviceVersionMappedRevisionSpecs serviceVersionMappedRevisionSpecs - - collatedVersions vervet.VersionIndex - collatedVersionedSpecs storage.CollatedVersionMappedSpecs - - newCollator func() (*storage.Collator, error) -} - -// New returns a new Storage instance. -func New(options ...Option) storage.Storage { - s := &Storage{ - serviceVersions: versionedResourceMap{}, - serviceVersionMappedRevisionSpecs: serviceVersionMappedRevisionSpecs{}, - - collatedVersionedSpecs: storage.CollatedVersionMappedSpecs{}, - - newCollator: func() (*storage.Collator, error) { return storage.NewCollator() }, - } - for _, option := range options { - option(s) - } - return s -} - -// Option defines a Storage constructor option. -type Option func(*Storage) - -// NewCollator configures the Storage instance to use the given constructor -// function for creating collator instances. -func NewCollator(newCollator func() (*storage.Collator, error)) Option { - return func(s *Storage) { - s.newCollator = newCollator - } -} - -// NotifyVersions implements scraper.Storage. -func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error { - for _, version := range versions { - // TODO: Add method to fetch contents here - // TODO: implement notify versions; update sunset when versions are removed - err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime) - if err != nil { - return err - } - } - return nil -} - -// HasVersion implements scraper.Storage. -func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) { - s.mu.RLock() - defer s.mu.RUnlock() - name = storage.GetSantizedHost(name) - revisions, ok := s.serviceVersionMappedRevisionSpecs[name][version] - - if !ok { - return false, nil - } - _, ok = revisions[storage.Digest(digest)] - return ok, nil -} - -// NotifyVersion implements scraper.Storage. -func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error { - s.mu.Lock() - defer s.mu.Unlock() - name = storage.GetSantizedHost(name) - digest := storage.NewDigest(contents) - - parsedVersion, err := vervet.ParseVersion(version) - if err != nil { - log.Error().Err(err).Msgf("Failed to resolve Vervet version for %s : %s", name, version) - return err - } - - // Check if service and version structures are initialized - if _, ok := s.serviceVersionMappedRevisionSpecs[name]; !ok { - s.serviceVersionMappedRevisionSpecs[name] = versionMappedRevisionSpecs{} - } - - revisions, ok := s.serviceVersionMappedRevisionSpecs[name][version] - if ok { - if _, exist := revisions[digest]; exist { - return nil - } - } else { - s.serviceVersionMappedRevisionSpecs[name][version] = mappedRevisionSpecs{} - revisions = s.serviceVersionMappedRevisionSpecs[name][version] - } - - // If the version is newly initialized, meaning no revisions exist, - // create the new vervet.VersionSlice with that version initialized - // else, append it to the existing service's VersionSlice - if len(revisions) == 0 { - if _, ok = s.serviceVersions[name]; !ok { - s.serviceVersions[name] = vervet.VersionSlice{parsedVersion} - } else { - s.serviceVersions[name] = append(s.serviceVersions[name], parsedVersion) - // sort versions when new ones are introduced to maintain BST functionality - sort.Sort(s.serviceVersions[name]) - } - } - // End of initializations - - // TODO: we may want to abstract out the storage objects instead of using chained maps. - // add the new ContentRevision - s.serviceVersionMappedRevisionSpecs[name][version][digest] = storage.ContentRevision{ - Service: name, - Timestamp: scrapeTime, - Digest: digest, - Blob: contents, - Version: parsedVersion, - } - - return nil -} - -// VersionIndex implements scraper.Storage. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions -} - -// Version implements scraper.Storage. -func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { - parsedVersion, err := vervet.ParseVersion(version) - if err != nil { - return nil, err - } - - s.mu.RLock() - defer s.mu.RUnlock() - - spec, ok := s.collatedVersionedSpecs[parsedVersion] - if !ok { - resolved, err := s.collatedVersions.Resolve(parsedVersion) - if err != nil { - return nil, err - } - resolvedSpec := s.collatedVersionedSpecs[resolved] - return resolvedSpec.MarshalJSON() - } - return spec.MarshalJSON() -} - -// CollateVersions aggregates versions and revisions from all the services, and produces unified versions and merged specs for all APIs. -func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string]bool) error { - // create an aggregate to process collated data from storage data - aggregate, err := s.newCollator() - if err != nil { - return err - } - for serv, versions := range s.serviceVersionMappedRevisionSpecs { - if _, ok := serviceFilter[serv]; !ok { - continue - } - for _, revisions := range versions { - for _, revision := range revisions { - aggregate.Add(serv, revision) - } - } - } - versions, specs, err := aggregate.Collate() - - s.mu.Lock() - defer s.mu.Unlock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.collatedVersionedSpecs = specs - - return err -} - -func (s *Storage) GetCollatedVersionSpecs() (map[string][]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - versionSpecs := map[string][]byte{} - for key, value := range s.collatedVersionedSpecs { - json, err := value.MarshalJSON() - if err != nil { - return nil, err - } - versionSpecs[key.String()] = json - } - return versionSpecs, nil -} diff --git a/vervet-underground/internal/storage/s3/client.go b/vervet-underground/internal/storage/s3/client.go index 226e2d70..04903ade 100644 --- a/vervet-underground/internal/storage/s3/client.go +++ b/vervet-underground/internal/storage/s3/client.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "strings" - "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -47,9 +46,6 @@ type Storage struct { client *s3.Client config Config newCollator func() (*storage.Collator, error) - - mu sync.RWMutex - collatedVersions vervet.VersionIndex } func New(ctx context.Context, awsCfg *Config, options ...Option) (storage.Storage, error) { @@ -187,11 +183,19 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string } // Versions implements scraper.Storage. -func (s *Storage) VersionIndex() vervet.VersionIndex { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.collatedVersions +func (s *Storage) VersionIndex(ctx context.Context) (vervet.VersionIndex, error) { + versions, err := s.ListCollatedVersions(ctx) + if err != nil { + return vervet.VersionIndex{}, err + } + vs := make(vervet.VersionSlice, len(versions)) + for idx, version := range versions { + vs[idx], err = vervet.ParseVersion(version) + if err != nil { + return vervet.VersionIndex{}, err + } + } + return vervet.NewVersionIndex(vs), nil } // Version implements scraper.Storage. @@ -203,9 +207,11 @@ func (s *Storage) Version(ctx context.Context, version string) ([]byte, error) { blob, err := s.GetCollatedVersionSpec(ctx, version) if err != nil { - s.mu.RLock() - resolved, err := s.collatedVersions.Resolve(parsedVersion) - s.mu.RUnlock() + index, err := s.VersionIndex(ctx) + if err != nil { + return nil, err + } + resolved, err := index.Resolve(parsedVersion) if err != nil { return nil, err } @@ -263,15 +269,11 @@ func (s *Storage) CollateVersions(ctx context.Context, serviceFilter map[string] } aggregate.Add(service, revision) } - versions, specs, err := aggregate.Collate() + specs, err := aggregate.Collate() if err != nil { return err } - s.mu.Lock() - s.collatedVersions = vervet.NewVersionIndex(versions) - s.mu.Unlock() - n, err := s.putCollatedSpecs(ctx, specs) if err != nil { return err diff --git a/vervet-underground/internal/storage/storage.go b/vervet-underground/internal/storage/storage.go index 5704aa02..8311d049 100644 --- a/vervet-underground/internal/storage/storage.go +++ b/vervet-underground/internal/storage/storage.go @@ -11,6 +11,21 @@ import ( "github.com/snyk/vervet/v6" ) +// ReadOnlyStorage defines functionality needed to fetch spec versions. +// Implmentations can assume that the storage has already been populated +// through the Storage interface. +type ReadOnlyStorage interface { + // HasVersion returns whether the storage has already stored the service + // API spec version at the given content digest. + HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) + + // VersionIndex fetches the Storage Versions index compiled by VU + VersionIndex(ctx context.Context) (vervet.VersionIndex, error) + + // Version fetches the Storage Version spec compiled by VU + Version(ctx context.Context, version string) ([]byte, error) +} + // Storage defines the storage functionality needed in order to store service // API version spec snapshots. type Storage interface { @@ -24,21 +39,13 @@ type Storage interface { // respective versions gathered. CollateVersions(ctx context.Context, serviceFilter map[string]bool) error - // HasVersion returns whether the storage has already stored the service - // API spec version at the given content digest. - HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) - // NotifyVersion tells the storage to store the given version contents at // the scrapeTime. The storage implementation must detect and ignore // duplicate version contents, as some services may not provide content // digest headers in their responses. NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error - // VersionIndex fetches the Storage Versions index compiled by VU - VersionIndex() vervet.VersionIndex - - // Version fetches the Storage Version spec compiled by VU - Version(ctx context.Context, version string) ([]byte, error) + ReadOnlyStorage } // CollatedVersionMappedSpecs Compiled aggregated spec for all services at that given version.