Skip to content

Commit

Permalink
Merge pull request #96 from snyk/feat/present-compiled-vu
Browse files Browse the repository at this point in the history
Feat/present compiled vu
  • Loading branch information
genslein authored Dec 15, 2021
2 parents 4fca1ab + 81f72ae commit 0ad9173
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 50 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
.dccache
.vscode

**/server
**/server
config.json
85 changes: 64 additions & 21 deletions vervet-underground/internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -23,17 +24,28 @@ type Storage interface {
// NotifyVersions tells the storage which versions are currently available.
// This is the primary mechanism by which the storage layer discovers and
// processes versions which are removed post-sunset.
NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error
NotifyVersions(name string, versions []string, scrapeTime time.Time) error

// CollateVersions tells the storage to execute the compilation and
// update all VU-formatted specs from all services and their
// respective versions gathered.
CollateVersions() error

// HasVersion returns whether the storage has already stored the service
// API spec version at the given content digest.
HasVersion(ctx context.Context, name string, version string, digest string) (bool, error)
HasVersion(name string, version string, digest string) (bool, error)

// NotifyVersion tells the storage to store the given version contents at
// the scrapeTime. The storage implementation must detect and ignore
// duplicate version contents, as some services may not provide content
// digest headers in their responses.
NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error
NotifyVersion(name string, version string, contents []byte, scrapeTime time.Time) error

// Versions fetches the Storage Versions compiled by VU
Versions() []string

// Version fetches the Storage Version spec compiled by VU
Version(version string) ([]byte, error)
}

// Scraper gets OpenAPI specs from a collection of services and updates storage
Expand All @@ -57,24 +69,32 @@ type Option func(*Scraper) error
func New(cfg *config.ServerConfig, storage Storage, options ...Option) (*Scraper, error) {
s := &Scraper{
storage: storage,
http: &http.Client{},
http: &http.Client{Timeout: time.Second * 15},
timeNow: time.Now,
}
err := setupScraper(s, cfg, options)
if err != nil {
return nil, err
}
return s, nil
}

func setupScraper(s *Scraper, cfg *config.ServerConfig, options []Option) error {
s.services = make([]service, len(cfg.Services))
for i := range cfg.Services {
u, err := url.Parse(cfg.Services[i] + "/openapi")
if err != nil {
return nil, errors.Wrapf(err, "invalid service %q", cfg.Services[i])
return errors.Wrapf(err, "invalid service %q", cfg.Services[i])
}
s.services[i] = service{base: cfg.Services[i], url: u}
}
for i := range options {
err := options[i](s)
if err != nil {
return nil, err
return err
}
}
return s, nil
return nil
}

// HTTPClient is a Scraper constructor Option that allows providing an
Expand Down Expand Up @@ -113,41 +133,52 @@ func (s *Scraper) Run(ctx context.Context) error {
errs = multierr.Append(errs, err)
}
close(errCh)
if errs != nil {
return errs
} else {
err := s.collateVersions()
errs = multierr.Append(errs, err)
}
return errs
}

func (s *Scraper) scrape(ctx context.Context, scrapeTime time.Time, svc service) error {
versions, err := s.getVersions(ctx, svc)
versions, err := s.getVersions(svc)
if err != nil {
return errors.WithStack(err)
}
err = s.storage.NotifyVersions(ctx, svc.base, versions, scrapeTime)
err = s.storage.NotifyVersions(svc.base, versions, scrapeTime)
if err != nil {
return errors.WithStack(err)
}
for i := range versions {
// TODO: we might run this concurrently per live service pod if/when
// we're more k8s aware, but we won't do that yet.
contents, isNew, err := s.getNewVersion(ctx, svc, versions[i])
contents, isNew, err := s.getNewVersion(svc, versions[i])
if err != nil {
return errors.WithStack(err)
}
if !isNew {
continue
}
err = s.storage.NotifyVersion(ctx, svc.base, versions[i], contents, scrapeTime)
err = s.storage.NotifyVersion(svc.base, versions[i], contents, scrapeTime)
if err != nil {
return errors.WithStack(err)
}
}
return nil
}

func (s *Scraper) getVersions(ctx context.Context, svc service) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", svc.url.String(), nil)
func (s *Scraper) collateVersions() error {
return s.storage.CollateVersions()
}

func (s *Scraper) getVersions(svc service) ([]string, error) {
req, err := http.NewRequest("GET", svc.url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}

resp, err := s.http.Do(req)
if err != nil {
return nil, errors.Wrap(err, "request failed")
Expand All @@ -171,16 +202,18 @@ func httpError(r *http.Response) error {
return errors.Errorf("request failed: HTTP %d", r.StatusCode)
}

func (s *Scraper) getNewVersion(ctx context.Context, svc service, version string) ([]byte, bool, error) {
isNew, err := s.hasNewVersion(ctx, svc, version)
func (s *Scraper) getNewVersion(svc service, version string) ([]byte, bool, error) {
// TODO: Services don't emit HEAD currently with compiled vervet
// will need to enforce down the line
isNew, err := s.hasNewVersion(svc, version)
if err != nil {
return nil, false, errors.WithStack(err)
}
if !isNew {
return nil, false, nil
}

req, err := http.NewRequestWithContext(ctx, "GET", svc.url.String()+"/"+version, nil)
req, err := http.NewRequest("GET", svc.url.String()+"/"+version, nil)
if err != nil {
return nil, false, errors.Wrap(err, "failed to create request")
}
Expand All @@ -192,7 +225,7 @@ func (s *Scraper) getNewVersion(ctx context.Context, svc service, version string
if resp.StatusCode != http.StatusOK {
return nil, false, httpError(resp)
}
if ct := resp.Header.Get("Content-Type"); ct != "application/json" {
if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "application/json") {
return nil, false, errors.Errorf("unexpected content type: %s", ct)
}
respContents, err := ioutil.ReadAll(resp.Body)
Expand All @@ -209,9 +242,9 @@ func (s *Scraper) getNewVersion(ctx context.Context, svc service, version string
return respContents, true, nil
}

func (s *Scraper) hasNewVersion(ctx context.Context, svc service, version string) (bool, error) {
func (s *Scraper) hasNewVersion(svc service, version string) (bool, error) {
// Check Digest to see if there's a new version
req, err := http.NewRequestWithContext(ctx, "HEAD", svc.url.String()+"/"+version, nil)
req, err := http.NewRequest("HEAD", svc.url.String()+"/"+version, nil)
if err != nil {
return false, errors.Wrap(err, "failed to create request")
}
Expand All @@ -227,13 +260,23 @@ func (s *Scraper) hasNewVersion(ctx context.Context, svc service, version string
if resp.StatusCode != http.StatusOK {
return false, httpError(resp)
}
if ct := resp.Header.Get("Content-Type"); ct != "application/json" {

// Can be formed similarly: "application/json; charset: utf-8"
if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "application/json") {
return false, errors.Errorf("unexpected content type: %s", ct)
}
digest := storage.DigestHeader(resp.Header.Get("Digest"))
if digest == "" {
// Not providing a digest is fine, we'll just come back with a GET
return true, nil
}
return s.storage.HasVersion(ctx, svc.base, version, digest)
return s.storage.HasVersion(svc.base, version, digest)
}

func (s *Scraper) Versions() []string {
return s.storage.Versions()
}

func (s *Scraper) Version(version string) ([]byte, error) {
return s.storage.Version(version)
}
8 changes: 3 additions & 5 deletions vervet-underground/internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestScraper(t *testing.T) {

// No version digests should be known
for _, test := range tests {
ok, err := st.HasVersion(ctx, test.service, test.version, test.digest)
ok, err := st.HasVersion(test.service, test.version, test.digest)
c.Assert(err, qt.IsNil)
c.Assert(ok, qt.IsFalse)
}
Expand All @@ -99,7 +99,7 @@ func TestScraper(t *testing.T) {

// Version digests now known to storage
for _, test := range tests {
ok, err := st.HasVersion(ctx, test.service, test.version, test.digest)
ok, err := st.HasVersion(test.service, test.version, test.digest)
c.Assert(err, qt.IsNil)
c.Assert(ok, qt.IsTrue)
}
Expand Down Expand Up @@ -201,13 +201,11 @@ func TestScraperCollation(t *testing.T) {

// Version digests now known to storage
for _, test := range tests {
ok, err := st.HasVersion(ctx, test.service, test.version, test.digest)
ok, err := st.HasVersion(test.service, test.version, test.digest)
c.Assert(err, qt.IsNil)
c.Assert(ok, qt.IsTrue)
}

err = st.CollateVersions()
c.Assert(err, qt.IsNil)
collated, err := st.GetCollatedVersionSpecs()
c.Assert(err, qt.IsNil)
c.Assert(len(collated), qt.Equals, 4)
Expand Down
45 changes: 34 additions & 11 deletions vervet-underground/internal/storage/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
package mem

import (
"context"
"github.com/pkg/errors"
"sort"
"sync"
"time"

"github.com/getkin/kin-openapi/openapi3"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/snyk/vervet"
"go.uber.org/multierr"
Expand Down Expand Up @@ -67,11 +66,11 @@ func New() *Storage {
}

// NotifyVersions implements scraper.Storage.
func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []string, scrapeTime time.Time) error {
func (s *Storage) NotifyVersions(name string, versions []string, scrapeTime time.Time) error {
for _, version := range versions {
// TODO: Add method to fetch contents here
// TODO: implement notify versions; update sunset when versions are removed
err := s.NotifyVersion(ctx, name, version, []byte{}, scrapeTime)
err := s.NotifyVersion(name, version, []byte{}, scrapeTime)
if err != nil {
return err
}
Expand All @@ -80,7 +79,7 @@ func (s *Storage) NotifyVersions(ctx context.Context, name string, versions []st
}

// HasVersion implements scraper.Storage.
func (s *Storage) HasVersion(ctx context.Context, name string, version string, digest string) (bool, error) {
func (s *Storage) HasVersion(name string, version string, digest string) (bool, error) {
s.mu.RLock()
defer s.mu.RUnlock()
revisions, ok := s.serviceVersionMappedRevisionSpecs[name][version]
Expand All @@ -93,7 +92,7 @@ func (s *Storage) HasVersion(ctx context.Context, name string, version string, d
}

// NotifyVersion implements scraper.Storage.
func (s *Storage) NotifyVersion(ctx context.Context, name string, version string, contents []byte, scrapeTime time.Time) error {
func (s *Storage) NotifyVersion(name string, version string, contents []byte, scrapeTime time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -145,6 +144,32 @@ func (s *Storage) NotifyVersion(ctx context.Context, name string, version string
return nil
}

// Versions implements scraper.Storage
func (s *Storage) Versions() []string {
s.mu.RLock()
defer s.mu.RUnlock()
stringVersions := make([]string, len(s.collatedVersions))
for i, version := range s.collatedVersions {
stringVersions[i] = version.String()
}

return stringVersions
}

// Version implements scraper.Storage
func (s *Storage) Version(version string) ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()

parsedVersion, err := vervet.ParseVersion(version)
if err != nil {
return nil, err
}

spec := s.collatedVersionedSpecs[*parsedVersion]
return spec.MarshalJSON()
}

// CollateVersions does the following:
// - calls updateCollatedVersions for a slice of unique vervet.Version entries
// - for each unique vervet.Version, run collateVersion to create a compiled VU openapi doc
Expand Down Expand Up @@ -206,13 +231,12 @@ func (s *Storage) collateVersion(version vervet.Version) error {
// number of services maximum needed
contentRevisions := make([]ContentRevision, 0)

s.mu.RLock()
// preprocessing all relevant docs in byte format before combining
for service, versionSlice := range s.serviceVersions {
// If there is an exact match on versions 1-to-1
s.mu.RLock()
revisions, ok := s.serviceVersionMappedRevisionSpecs[service][version.String()]
s.mu.RUnlock()
var currentRevision ContentRevision
revisions, ok := s.serviceVersionMappedRevisionSpecs[service][version.String()]
if ok {
// TODO: iterate through and take last contentRevision.
// Could change to []ContentRevision in struct later
Expand All @@ -229,9 +253,7 @@ func (s *Storage) collateVersion(version vervet.Version) error {
continue
}

s.mu.RLock()
revisions, ok = s.serviceVersionMappedRevisionSpecs[service][resolvedVersion.String()]
s.mu.RUnlock()
if ok {
// TODO: iterate through and take last contentRevision.
// Could change to []ContentRevision in struct later
Expand All @@ -242,6 +264,7 @@ func (s *Storage) collateVersion(version vervet.Version) error {
}
}
}
s.mu.RUnlock()

err := s.mergeContentRevisions(version, contentRevisions)
if err != nil {
Expand Down
Loading

0 comments on commit 0ad9173

Please sign in to comment.