Skip to content

Commit dec8357

Browse files
committed
Debug operator
This commit introduces the debug operator- an operator intending to help debugging complex queries. When running the debug operator using zq or zed all encounterd values at the point of the operator are written to stderr. Closes #5181
1 parent 3474fcb commit dec8357

File tree

34 files changed

+2684
-2258
lines changed

34 files changed

+2684
-2258
lines changed

api/queryio/client.go

+62-39
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package queryio
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io"
@@ -12,58 +13,80 @@ import (
1213
"github.com/brimdata/zed/zson"
1314
)
1415

15-
type Query struct {
16-
reader *zngio.Reader
17-
closer io.Closer
16+
type scanner struct {
17+
channel string
18+
scanner zbuf.Scanner
19+
closer io.Closer
20+
progress zbuf.Progress
1821
}
1922

20-
// NewQuery returns a Query that reads a ZNG-encoded query response
21-
// from rc and decodes it. Closing the Query also closes rc.
22-
func NewQuery(rc io.ReadCloser) *Query {
23-
return &Query{
24-
reader: zngio.NewReader(zed.NewContext(), rc),
25-
closer: rc,
23+
func NewScanner(ctx context.Context, rc io.ReadCloser) zbuf.Scanner {
24+
s, err := zngio.NewReader(zed.NewContext(), rc).NewScanner(ctx, nil)
25+
if err != nil {
26+
// XXX This shouldn't happen since we don't have a filter.
27+
panic(err)
28+
}
29+
return &scanner{
30+
scanner: s,
31+
closer: rc,
2632
}
2733
}
2834

29-
func (q *Query) Close() error {
30-
err := q.reader.Close()
31-
q.closer.Close()
32-
return err
35+
func (s *scanner) Progress() zbuf.Progress {
36+
return s.progress
3337
}
3438

35-
func (q *Query) Read() (*zed.Value, error) {
36-
val, ctrl, err := q.reader.ReadPayload()
37-
if ctrl != nil {
38-
if ctrl.Format != zngio.ControlFormatZSON {
39-
return nil, fmt.Errorf("unsupported app encoding: %v", ctrl.Format)
40-
}
41-
arena := zed.NewArena()
42-
defer arena.Unref()
43-
value, err := zson.ParseValue(zed.NewContext(), arena, string(ctrl.Bytes))
44-
if err != nil {
45-
return nil, fmt.Errorf("unable to parse Zed control message: %w (%s)", err, string(ctrl.Bytes))
39+
func (s *scanner) Pull(done bool) (zbuf.Batch, error) {
40+
again:
41+
batch, err := s.scanner.Pull(done)
42+
if err == nil {
43+
if batch != nil {
44+
return zbuf.Label(s.channel, batch), nil
4645
}
47-
var v interface{}
48-
if err := unmarshaler.Unmarshal(value, &v); err != nil {
49-
return nil, fmt.Errorf("unable to unmarshal Zed control message: %w (%s)", err, string(ctrl.Bytes))
50-
}
51-
return nil, controlToError(v)
46+
return nil, s.closer.Close()
5247
}
53-
return val, err
54-
}
55-
56-
func controlToError(ctrl interface{}) error {
57-
switch ctrl := ctrl.(type) {
48+
zctrl, ok := err.(*zbuf.Control)
49+
if !ok {
50+
return nil, err
51+
}
52+
v, err := s.marshalControl(zctrl)
53+
if err != nil {
54+
return nil, err
55+
}
56+
switch ctrl := v.(type) {
5857
case *api.QueryChannelSet:
59-
return &zbuf.Control{Message: zbuf.SetChannel(ctrl.Channel)}
58+
s.channel = ctrl.Channel
59+
goto again
6060
case *api.QueryChannelEnd:
61-
return &zbuf.Control{Message: zbuf.EndChannel(ctrl.Channel)}
61+
eoc := zbuf.EndOfChannel(ctrl.Channel)
62+
return &eoc, nil
6263
case *api.QueryStats:
63-
return &zbuf.Control{Message: ctrl.Progress}
64+
s.progress.Add(ctrl.Progress)
65+
goto again
6466
case *api.QueryError:
65-
return errors.New(ctrl.Error)
67+
return nil, errors.New(ctrl.Error)
6668
default:
67-
return fmt.Errorf("unsupported control message: %T", ctrl)
69+
return nil, fmt.Errorf("unsupported control message: %T", ctrl)
70+
}
71+
}
72+
73+
func (s *scanner) marshalControl(zctrl *zbuf.Control) (any, error) {
74+
ctrl, ok := zctrl.Message.(*zngio.Control)
75+
if !ok {
76+
return nil, fmt.Errorf("unknown control type: %T", zctrl.Message)
77+
}
78+
if ctrl.Format != zngio.ControlFormatZSON {
79+
return nil, fmt.Errorf("unsupported app encoding: %v", ctrl.Format)
80+
}
81+
arena := zed.NewArena()
82+
defer arena.Unref()
83+
value, err := zson.ParseValue(zed.NewContext(), arena, string(ctrl.Bytes))
84+
if err != nil {
85+
return nil, fmt.Errorf("unable to parse Zed control message: %w (%s)", err, string(ctrl.Bytes))
86+
}
87+
var v interface{}
88+
if err := unmarshaler.Unmarshal(value, &v); err != nil {
89+
return nil, fmt.Errorf("unable to unmarshal Zed control message: %w (%s)", err, string(ctrl.Bytes))
6890
}
91+
return v, nil
6992
}

cli/zq/command.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package zq
33
import (
44
"flag"
55
"fmt"
6+
"os"
67

78
"github.com/brimdata/zed"
89
"github.com/brimdata/zed/cli"
@@ -17,6 +18,7 @@ import (
1718
"github.com/brimdata/zed/zbuf"
1819
"github.com/brimdata/zed/zfmt"
1920
"github.com/brimdata/zed/zio"
21+
"github.com/brimdata/zed/zio/zsonio"
2022
)
2123

2224
var Cmd = &charm.Spec{
@@ -159,7 +161,11 @@ func (c *Command) Run(args []string) error {
159161
return err
160162
}
161163
defer query.Pull(true)
162-
err = zbuf.CopyPuller(writer, query)
164+
out := map[string]zio.WriteCloser{
165+
"main": writer,
166+
"debug": zsonio.NewWriter(zio.NopCloser(os.Stderr), zsonio.WriterOpts{}),
167+
}
168+
err = zbuf.CopyMux(out, query)
163169
if closeErr := writer.Close(); err == nil {
164170
err = closeErr
165171
}

cmd/zed/branch/command.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/brimdata/zed/lakeparse"
1414
"github.com/brimdata/zed/pkg/charm"
1515
"github.com/brimdata/zed/pkg/storage"
16-
"github.com/brimdata/zed/zio"
16+
"github.com/brimdata/zed/zbuf"
1717
)
1818

1919
var Cmd = &charm.Spec{
@@ -132,8 +132,8 @@ func (c *Command) list(ctx context.Context, lake api.Interface) error {
132132
w.Close()
133133
return err
134134
}
135-
defer q.Close()
136-
err = zio.Copy(w, q)
135+
defer q.Pull(true)
136+
err = zbuf.CopyPuller(w, q)
137137
if closeErr := w.Close(); err == nil {
138138
err = closeErr
139139
}

cmd/zed/internal/lakemanage/scan.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/brimdata/zed/order"
1313
"github.com/brimdata/zed/runtime/sam/expr"
1414
"github.com/brimdata/zed/runtime/sam/expr/extent"
15+
"github.com/brimdata/zed/zbuf"
1516
"github.com/brimdata/zed/zio"
1617
"github.com/brimdata/zed/zson"
1718
"github.com/segmentio/ksuid"
@@ -66,20 +67,22 @@ from %q@%q:objects
6667
`
6768

6869
type objectIterator struct {
69-
reader zio.ReadCloser
70+
reader zio.Reader
71+
puller zbuf.Puller
7072
unmarshaler *zson.UnmarshalZNGContext
7173
arena *zed.Arena
7274
}
7375

7476
func newObjectIterator(ctx context.Context, lake api.Interface, head *lakeparse.Commitish) (*objectIterator, error) {
7577
query := fmt.Sprintf(iteratorQuery, head.Pool, head.Branch, head.Pool, head.Branch)
76-
r, err := lake.Query(ctx, nil, query)
78+
q, err := lake.Query(ctx, nil, query)
7779
if err != nil {
7880
return nil, err
7981
}
8082
arena := zed.NewArena()
8183
return &objectIterator{
82-
reader: r,
84+
reader: zbuf.PullerReader(q),
85+
puller: q,
8386
unmarshaler: zson.NewZNGUnmarshaler().SetContext(zed.NewContext(), arena),
8487
arena: arena,
8588
}, nil
@@ -104,7 +107,8 @@ func (r *objectIterator) next() (*object, error) {
104107
}
105108

106109
func (r *objectIterator) close() error {
107-
return r.reader.Close()
110+
_, err := r.puller.Pull(true)
111+
return err
108112
}
109113

110114
type object struct {

cmd/zed/log/command.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/brimdata/zed/compiler/parser"
1111
"github.com/brimdata/zed/pkg/charm"
1212
"github.com/brimdata/zed/pkg/storage"
13-
"github.com/brimdata/zed/zio"
13+
"github.com/brimdata/zed/zbuf"
1414
)
1515

1616
var Cmd = &charm.Spec{
@@ -76,8 +76,8 @@ func (c *Command) Run(args []string) error {
7676
}
7777
return err
7878
}
79-
defer q.Close()
80-
err = zio.Copy(w, q)
79+
defer q.Pull(true)
80+
err = zbuf.CopyPuller(w, q)
8181
if closeErr := w.Close(); err == nil {
8282
err = closeErr
8383
}

cmd/zed/ls/command.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/brimdata/zed/cmd/zed/root"
1111
"github.com/brimdata/zed/pkg/charm"
1212
"github.com/brimdata/zed/pkg/storage"
13-
"github.com/brimdata/zed/zio"
13+
"github.com/brimdata/zed/zbuf"
1414
"github.com/segmentio/ksuid"
1515
)
1616

@@ -89,8 +89,8 @@ func (c *Command) Run(args []string) error {
8989
w.Close()
9090
return err
9191
}
92-
defer q.Close()
93-
err = zio.Copy(w, q)
92+
defer q.Pull(true)
93+
err = zbuf.CopyPuller(w, q)
9494
if closeErr := w.Close(); err == nil {
9595
err = closeErr
9696
}

cmd/zed/query/command.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"flag"
5+
"os"
56

67
"github.com/brimdata/zed/cli/outputflags"
78
"github.com/brimdata/zed/cli/poolflags"
@@ -12,6 +13,7 @@ import (
1213
"github.com/brimdata/zed/pkg/storage"
1314
"github.com/brimdata/zed/zbuf"
1415
"github.com/brimdata/zed/zio"
16+
"github.com/brimdata/zed/zio/zsonio"
1517
)
1618

1719
var Cmd = &charm.Spec{
@@ -63,13 +65,17 @@ func (c *Command) Run(args []string) error {
6365
return err
6466
}
6567
head, _ := c.poolFlags.HEAD()
66-
query, err := lake.QueryWithControl(ctx, head, src, c.queryFlags.Includes...)
68+
query, err := lake.Query(ctx, head, src, c.queryFlags.Includes...)
6769
if err != nil {
6870
w.Close()
6971
return err
7072
}
71-
defer query.Close()
72-
err = zio.Copy(w, zbuf.NoControl(query))
73+
defer query.Pull(true)
74+
out := map[string]zio.WriteCloser{
75+
"main": w,
76+
"debug": zsonio.NewWriter(zio.NopCloser(os.Stderr), zsonio.WriterOpts{}),
77+
}
78+
err = zbuf.CopyMux(out, query)
7379
if closeErr := w.Close(); err == nil {
7480
err = closeErr
7581
}

compiler/ast/ast.go

+13
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,11 @@ type (
627627
KeywordPos int `json:"keyword_pos"`
628628
Name *ID `json:"name"`
629629
}
630+
Debug struct {
631+
Kind string `json:"kind" unpack:""`
632+
KeywordPos int `json:"keyword_pos"`
633+
Expr Expr `json:"expr"`
634+
}
630635
)
631636

632637
// Source structure
@@ -781,6 +786,7 @@ func (*Sample) OpAST() {}
781786
func (*Load) OpAST() {}
782787
func (*Assert) OpAST() {}
783788
func (*Output) OpAST() {}
789+
func (*Debug) OpAST() {}
784790

785791
func (x *Scope) Pos() int {
786792
if x.Decls != nil {
@@ -817,6 +823,7 @@ func (x *Sample) Pos() int { return x.KeywordPos }
817823
func (x *Load) Pos() int { return x.KeywordPos }
818824
func (x *Assert) Pos() int { return x.KeywordPos }
819825
func (x *Output) Pos() int { return x.KeywordPos }
826+
func (x *Debug) Pos() int { return x.KeywordPos }
820827

821828
func (x *Scope) End() int { return x.Body.End() }
822829
func (x *Parallel) End() int { return x.Rparen }
@@ -926,6 +933,12 @@ func (x *Sample) End() int {
926933
func (x *Load) End() int { return x.EndPos }
927934
func (x *Assert) End() int { return x.Expr.End() }
928935
func (x *Output) End() int { return x.Name.End() }
936+
func (x *Debug) End() int {
937+
if x.Expr != nil {
938+
return x.Expr.End()
939+
}
940+
return x.KeywordPos + 6
941+
}
929942

930943
// An Agg is an AST node that represents a aggregate function. The Name
931944
// field indicates the aggregation method while the Expr field indicates

compiler/ast/dag/op.go

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ type (
8686
Expr Expr `json:"expr"`
8787
Order order.Which `json:"order"`
8888
}
89+
Mirror struct {
90+
Kind string `json:"kind" unpack:""`
91+
Main Seq `json:"main"`
92+
Mirror Seq `json:"mirror"`
93+
}
8994
Over struct {
9095
Kind string `json:"kind" unpack:""`
9196
Defs []Def `json:"defs"`
@@ -314,6 +319,7 @@ func (*Over) OpNode() {}
314319
func (*Vectorize) OpNode() {}
315320
func (*Yield) OpNode() {}
316321
func (*Merge) OpNode() {}
322+
func (*Mirror) OpNode() {}
317323
func (*Combine) OpNode() {}
318324
func (*Scope) OpNode() {}
319325
func (*Load) OpNode() {}

compiler/ast/dag/unpack.go

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var unpacker = unpack.New(
4141
MapCall{},
4242
MapExpr{},
4343
Merge{},
44+
Mirror{},
4445
Output{},
4546
Over{},
4647
OverExpr{},

compiler/ast/unpack.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var unpacker = unpack.New(
2222
Conditional{},
2323
ConstDecl{},
2424
Cut{},
25+
Debug{},
2526
astzed.DefValue{},
2627
Drop{},
2728
Explode{},

compiler/describe/analyze.go

+3
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ func describeOpAggs(op dag.Op, parents []field.List) []field.List {
182182
aggs = append(aggs, describeAggs(p, []field.List{nil})...)
183183
}
184184
return aggs
185+
case *dag.Mirror:
186+
aggs := describeAggs(op.Main, []field.List{nil})
187+
return append(aggs, describeAggs(op.Mirror, []field.List{nil})...)
185188
case *dag.Summarize:
186189
// The field list for aggregation with no keys is an empty slice and
187190
// not nil.

0 commit comments

Comments
 (0)