Skip to content

Commit

Permalink
Merge pull request #344 from snyk/feat/split-collation
Browse files Browse the repository at this point in the history
feat: split collation and api in vervet underground
  • Loading branch information
jgresty authored Apr 30, 2024
2 parents b724515 + e0120cf commit a0e19d3
Show file tree
Hide file tree
Showing 21 changed files with 710 additions and 524 deletions.
5 changes: 4 additions & 1 deletion vervet-underground/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ RUN go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
CGO_ENABLED=0 \
go build -v -o /go/bin/app ./
go build -v -o /go/bin/app ./cmd/api/main.go
RUN --mount=type=cache,target=/root/.cache/go-build \
CGO_ENABLED=0 \
go build -v -o /go/bin/scraper ./cmd/scraper/main.go

#################
# Runtime stage #
Expand Down
16 changes: 11 additions & 5 deletions vervet-underground/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fmt:

.PHONY: clean
clean:
rm -f server && go clean
rm -f api scraper && go clean

.PHONY: lint
lint:
Expand All @@ -36,8 +36,13 @@ test:
go test ./... -count=1 -race

.PHONY: build
build:
go build server.go
build: api scraper

api:
go build -o api cmd/api/main.go

scraper:
go build -o scraper cmd/scraper/main.go
#----------------------------------------------------------------------------------
# Check for updates to packages in remote OSS repositories and update go.mod AND
# go.sum to match changes. Then download the all the dependencies
Expand All @@ -57,10 +62,11 @@ test-coverage:

.PHONY: start
start:
go run server.go
go run cmd/scraper/main.go
go run cmd/api/main.go

.PHONY: install-tools
install-tools:
install-tools:
ifndef CI
mkdir -p ${GO_BIN}
curl -sSfL 'https://raw.githubusercontent.com/golangci/golangci-lint/${GOCI_LINT_V}/install.sh' | sh -s -- -b ${GO_BIN} ${GOCI_LINT_V}
Expand Down
152 changes: 152 additions & 0 deletions vervet-underground/cmd/api/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"vervet-underground/config"
"vervet-underground/internal/handler"
"vervet-underground/internal/storage"
"vervet-underground/internal/storage/disk"
"vervet-underground/internal/storage/gcs"
"vervet-underground/internal/storage/s3"
)

func main() {
var wait time.Duration
var configJson string
flag.DurationVar(&wait, "graceful-timeout", time.Second*15,
"the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m")
flag.StringVar(&configJson, "config-file", "config.default.json",
"the configuration file holding target services and the host address to run server on")

flag.Parse()
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
zerolog.SetGlobalLevel(zerolog.DebugLevel)

var cfg *config.ServerConfig
var err error
if cfg, err = config.Load(configJson); err != nil {
log.Fatal().Err(err).Msg("unable to load config")
}

ctx := context.Background()
st, err := initializeStorage(ctx, cfg)
if err != nil {
log.Fatal().Err(err).Msg("unable to initialize storage client")
}

h := handler.New(cfg, st, handler.UseDefaultMiddleware)

srv := &http.Server{
Addr: fmt.Sprintf("%s:8080", cfg.Host),
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
Handler: h,
}

grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host))
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("failed to start server: %w", err)
}
return nil
})

grp.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

interceptSignals(grpCtx)
time.Sleep(wait)
log.Info().Float64("timeoutSeconds", wait.Seconds()).Msg("server stopping")

return shutdown(ctx, srv, wait)
})

if err := grp.Wait(); err != nil {
log.Fatal().Err(err).Msg("server unexpectedly stopped")
}

log.Info().Msg("shutting down cleanly")
}

func initializeStorage(ctx context.Context, cfg *config.ServerConfig) (storage.ReadOnlyStorage, error) {
switch cfg.Storage.Type {
case config.StorageTypeDisk:
return disk.New(cfg.Storage.Disk.Path), nil
case config.StorageTypeS3:
return s3.New(ctx, &s3.Config{
AwsRegion: cfg.Storage.S3.Region,
AwsEndpoint: cfg.Storage.S3.Endpoint,
IamRoleEnabled: cfg.Storage.IamRoleEnabled,
BucketName: cfg.Storage.BucketName,
Credentials: s3.StaticKeyCredentials{
AccessKey: cfg.Storage.S3.AccessKey,
SecretKey: cfg.Storage.S3.SecretKey,
SessionKey: cfg.Storage.S3.SessionKey,
},
})
case config.StorageTypeGCS:
return gcs.New(ctx, &gcs.Config{
GcsRegion: cfg.Storage.GCS.Region,
GcsEndpoint: cfg.Storage.GCS.Endpoint,
IamRoleEnabled: cfg.Storage.IamRoleEnabled,
BucketName: cfg.Storage.BucketName,
Credentials: gcs.StaticKeyCredentials{
ProjectId: cfg.Storage.GCS.ProjectId,
Filename: cfg.Storage.GCS.Filename,
},
})
}
return nil, fmt.Errorf("unknown storage backend: %s", cfg.Storage.Type)
}

func interceptSignals(ctx context.Context) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

select {
case <-ctx.Done():
case sig := <-sigc:
log.Info().Str("signal", sig.String()).Msg("intercepted signal")
}
}

func shutdown(ctx context.Context, srv *http.Server, timeout time.Duration) error {
shutdownCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if err := srv.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("failed to gracefully shutdown server: %w", err)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,23 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"vervet-underground/config"
"vervet-underground/internal/handler"
"vervet-underground/internal/scraper"
"vervet-underground/internal/storage"
"vervet-underground/internal/storage/disk"
"vervet-underground/internal/storage/gcs"
"vervet-underground/internal/storage/mem"
"vervet-underground/internal/storage/s3"
)

func main() {
var wait time.Duration
var scrapeInterval time.Duration
var configJson string
var overlayFile string
flag.DurationVar(&wait, "graceful-timeout", time.Second*15,
"the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m")
flag.DurationVar(&scrapeInterval, "scrape-interval", time.Minute,
"the frequency at which scraping occurs - e.g. 15s, 1m, 1h")
flag.StringVar(&configJson, "config-file", "config.default.json",
"the configuration file holding target services and the host address to run server on")
flag.StringVar(&overlayFile, "overlay-file", "",
Expand All @@ -43,105 +35,36 @@ func main() {
var cfg *config.ServerConfig
var err error
if cfg, err = config.Load(configJson); err != nil {
logError(err)
log.Fatal().Msg("unable to load config")
log.Fatal().Err(err).Msg("unable to load config")
}
log.Info().Msgf("services: %s", cfg.Services)

var overlayContents []byte
if overlayFile != "" {
overlayContents, err = os.ReadFile(overlayFile)
if err != nil {
logError(err)
log.Fatal().Msgf("unable to load overlay file %q", overlayFile)
log.Fatal().Err(err).Msgf("unable to load overlay file %q", overlayFile)
}
}

// initialize Scraper
ticker := time.NewTicker(scrapeInterval)
ctx := context.Background()
st, err := initializeStorage(ctx, cfg, overlayContents)
if err != nil {
logError(err)
log.Fatal().Msg("unable to initialize storage client")
log.Fatal().Err(err).Msg("unable to initialize storage client")
}

sc, err := scraper.New(cfg, st, scraper.HTTPClient(&http.Client{
Timeout: wait,
Transport: scraper.DurationTransport(http.DefaultTransport),
}))
if err != nil {
logError(err)
log.Fatal().Msg("unable to load storage")
}
// initialize
err = runScrape(ctx, sc)
if err != nil {
log.Fatal().Err(err).Msg("failed initialization scraping of service")
log.Fatal().Err(err).Msg("unable to load storage")
}

h := handler.New(cfg, sc, handler.UseDefaultMiddleware)

srv := &http.Server{
Addr: fmt.Sprintf("%s:8080", cfg.Host),
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
IdleTimeout: time.Second * 60,
Handler: h,
}

quit := make(chan struct{}, 1)
go func() {
for {
select {
case <-ticker.C:
if err := runScrape(ctx, sc); err != nil {
logError(err)
}
case <-quit:
ticker.Stop()
return
}
}
}()

// Run our server in a goroutine so that it doesn't block.
go func() {
log.Info().Msg(fmt.Sprintf("I'm starting my server on %s:8080", cfg.Host))
if err := srv.ListenAndServe(); err != nil && errors.Is(err, http.ErrServerClosed) {
logError(err)
os.Exit(1)
}
}()

c := make(chan os.Signal, 1)
// We'll accept graceful shutdowns when quit via SIGINT (Ctrl+C)
// SIGKILL, SIGQUIT or SIGTERM (Ctrl+/) will not be caught.
signal.Notify(c, os.Interrupt)
// Block until we receive our signal.
<-c

// closes the scraper go routine
quit <- struct{}{}
close(quit)
log.Info().Msg("scraper successfully spun down")

// Create a deadline to wait for.
ctx, cancel := context.WithTimeout(context.Background(), wait)
defer cancel()
// Doesn't block if no connections, but will otherwise wait
// until the timeout deadline.
err = srv.Shutdown(ctx)
err = runScrape(ctx, sc)
if err != nil {
logError(err)
log.Fatal().Err(err).Msg("failed scraping of service")
}

// Optionally, you could run srv.Shutdown in a goroutine and block on
// <-ctx.Done() if your application should wait for other services
// to finalize based on context cancellation.
log.Info().Msg("shutting down cleanly")
os.Exit(0)
}

// runScrape runs scraping all services and can take
Expand All @@ -157,15 +80,6 @@ func runScrape(ctx context.Context, sc *scraper.Scraper) error {
return nil
}

func logError(err error) {
log.
Error().
Stack().
Err(err).
Str("cause", fmt.Sprintf("%+v", errors.Cause(err))).
Msg("UnhandledException")
}

func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayContents []byte) (storage.Storage, error) {
collatorOpts := []storage.CollatorOption{storage.CollatorExcludePattern(cfg.Merging.ExcludePatterns)}
if overlayContents != nil {
Expand All @@ -175,8 +89,8 @@ func initializeStorage(ctx context.Context, cfg *config.ServerConfig, overlayCon
return storage.NewCollator(collatorOpts...)
}
switch cfg.Storage.Type {
case config.StorageTypeMemory:
return mem.New(mem.NewCollator(newCollator)), nil
case config.StorageTypeDisk:
return disk.New(cfg.Storage.Disk.Path, disk.NewCollator(newCollator)), nil
case config.StorageTypeS3:
return s3.New(ctx, &s3.Config{
AwsRegion: cfg.Storage.S3.Region,
Expand Down
5 changes: 4 additions & 1 deletion vervet-underground/config.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
],
"host": "0.0.0.0",
"storage": {
"type": "memory"
"type": "disk",
"disk": {
"path": "/tmp/specs"
}
}
}
Loading

0 comments on commit a0e19d3

Please sign in to comment.