Skip to content

Commit

Permalink
cmd/ejobs: download non-error rows by default
Browse files Browse the repository at this point in the history
This can significantly save time and transient errors with downloading
massive results experienced in the past. The users typically do not care
about error rows. An option -e flag can be passed to download those as
well.

Change-Id: Iffc7ff89a317a94fd0522a521b3d4fe48eff9bc5
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/642916
LUCI-TryBot-Result: Go LUCI <[email protected]>
Reviewed-by: Jonathan Amsterdam <[email protected]>
  • Loading branch information
zpavlinovic committed Jan 15, 2025
1 parent 339a4c8 commit 9cf618b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
8 changes: 5 additions & 3 deletions cmd/ejobs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
minImporters int // for start
waitInterval time.Duration // for wait
force bool // for results
errs bool // for results
outfile string // for results
)

Expand Down Expand Up @@ -77,11 +78,12 @@ var commands = []command{
fs.DurationVar(&waitInterval, "i", 0, "display updates at this interval")
},
},
{"results", "[-f] [-o FILE.json] JOBID",
{"results", "[-f] [-e] [-o FILE.json] JOBID",
"download results as JSON",
doResults,
func(fs *flag.FlagSet) {
fs.BoolVar(&force, "f", false, "download even if unfinished")
fs.BoolVar(&errs, "e", false, "also download error results (by default, only non-error results are downloaded)")
fs.StringVar(&outfile, "o", "", "output filename")
},
},
Expand Down Expand Up @@ -455,7 +457,7 @@ func copyToGCS(ctx context.Context, object *storage.ObjectHandle, filename strin

func doResults(ctx context.Context, args []string) (err error) {
if len(args) == 0 {
return errors.New("wrong number of args: want [-f] [-o FILE.json] JOB_ID")
return errors.New("wrong number of args: want [-f] [-e] [-o FILE.json] JOB_ID")
}
jobID := args[0]
ts, err := identityTokenSource(ctx)
Expand All @@ -470,7 +472,7 @@ func doResults(ctx context.Context, args []string) (err error) {
if !force && done < job.NumEnqueued {
return fmt.Errorf("job not finished (%d/%d completed); use -f for partial results", done, job.NumEnqueued)
}
results, err := requestJSON[[]*analysis.Result](ctx, "jobs/results?jobid="+jobID, ts)
results, err := requestJSON[[]*analysis.Result](ctx, fmt.Sprintf("jobs/results?jobid=%s&errors=%t", jobID, errs), ts)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions internal/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,21 @@ func JSONTreeToDiagnostics(jsonTree JSONTree) []*Diagnostic {
return diags
}

func ReadResults(ctx context.Context, c *bigquery.Client, binaryName, binaryVersion, binaryArgs string) (_ []*Result, err error) {
// ReadResults reads non-error rows with binaryName, binaryVersion, and binaryArgs
// from the analysis table of c. If errs is "true", error rows are included as well.
func ReadResults(ctx context.Context, c *bigquery.Client, binaryName, binaryVersion, binaryArgs, errs string) (_ []*Result, err error) {
defer derrors.Wrap(&err, "ReadResults")
q := bigquery.PartitionQuery{
From: c.FullTableName(TableName),
PartitionOn: "module_path, version",
Where: fmt.Sprintf("binary_name='%s' AND binary_version='%s' AND binary_args='%s'",
binaryName, binaryVersion, binaryArgs),
OrderBy: "created_at DESC",
OrderBy: "created_at DESC",
}
if errs == "true" {
q.Where = fmt.Sprintf("binary_name='%s' AND binary_version='%s' AND binary_args='%s'",
binaryName, binaryVersion, binaryArgs)
} else {
q.Where = fmt.Sprintf("binary_name='%s' AND binary_version='%s' AND binary_args='%s' AND error=''",
binaryName, binaryVersion, binaryArgs)
}
iter, err := c.Query(ctx, q.String())
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions internal/worker/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

// Handlers for jobs.
//
// jobs/describe?jobid=xxx describe a job

// TODO:
// jobs/describe?jobid=xxx describe a job
// jobs/list list all jobs
// jobs/cancel?jobid=xxx cancel a job
// jobs/cancel?jobid=xxx cancel a job
// jobs/results?jobid=xxx&errors={true|false} get job results

package worker

Expand Down Expand Up @@ -37,7 +36,8 @@ func (s *Server) handleJobs(w http.ResponseWriter, r *http.Request) (err error)
}

jobID := r.FormValue("jobid")
return s.processJobRequest(ctx, w, r.URL.Path, jobID, s.jobDB)
errs := r.FormValue("errors") // for results
return s.processJobRequest(ctx, w, r.URL.Path, jobID, errs, s.jobDB)
}

type jobDB interface {
Expand All @@ -47,7 +47,7 @@ type jobDB interface {
ListJobs(context.Context, func(*jobs.Job, time.Time) error) error
}

func (s *Server) processJobRequest(ctx context.Context, w io.Writer, path, jobID string, db jobDB) error {
func (s *Server) processJobRequest(ctx context.Context, w io.Writer, path, jobID, errs string, db jobDB) error {
path = strings.TrimPrefix(path, "/jobs/")
switch path {
case "describe": // describe one job
Expand Down Expand Up @@ -91,7 +91,7 @@ func (s *Server) processJobRequest(ctx context.Context, w io.Writer, path, jobID
if s.bqClient == nil {
return errors.New("bq client is nil")
}
results, err := analysis.ReadResults(ctx, s.bqClient, job.Binary, job.BinaryVersion, job.BinaryArgs)
results, err := analysis.ReadResults(ctx, s.bqClient, job.Binary, job.BinaryVersion, job.BinaryArgs, errs)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/worker/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestJobs(t *testing.T) {
}
s := &Server{}
var buf bytes.Buffer
if err := s.processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), db); err != nil {
if err := s.processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), "false", db); err != nil {
t.Fatal(err)
}

Expand All @@ -42,7 +42,7 @@ func TestJobs(t *testing.T) {
t.Errorf("got\n%+v\nwant\n%+v", got, job)
}

if err := s.processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), db); err != nil {
if err := s.processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), "false", db); err != nil {
t.Fatal(err)
}

Expand All @@ -55,7 +55,7 @@ func TestJobs(t *testing.T) {
}

buf.Reset()
if err := s.processJobRequest(ctx, &buf, "/jobs/list", "", db); err != nil {
if err := s.processJobRequest(ctx, &buf, "/jobs/list", "", "", db); err != nil {
t.Fatal(err)
}
// Don't check for specific output, just make sure there's something
Expand Down

0 comments on commit 9cf618b

Please sign in to comment.