1
1
package queryio
2
2
3
3
import (
4
+ "context"
4
5
"errors"
5
6
"fmt"
6
7
"io"
@@ -12,58 +13,80 @@ import (
12
13
"github.com/brimdata/zed/zson"
13
14
)
14
15
15
- type Query struct {
16
- reader * zngio.Reader
17
- closer io.Closer
16
+ type scanner struct {
17
+ channel string
18
+ scanner zbuf.Scanner
19
+ closer io.Closer
20
+ progress zbuf.Progress
18
21
}
19
22
20
- // NewQuery returns a Query that reads a ZNG-encoded query response
21
- // from rc and decodes it. Closing the Query also closes rc.
22
- func NewQuery (rc io.ReadCloser ) * Query {
23
- return & Query {
24
- reader : zngio .NewReader (zed .NewContext (), rc ),
25
- closer : rc ,
23
+ func NewScanner (ctx context.Context , rc io.ReadCloser ) zbuf.Scanner {
24
+ s , err := zngio .NewReader (zed .NewContext (), rc ).NewScanner (ctx , nil )
25
+ if err != nil {
26
+ // XXX This shouldn't happen since we don't have a filter.
27
+ panic (err )
28
+ }
29
+ return & scanner {
30
+ scanner : s ,
31
+ closer : rc ,
26
32
}
27
33
}
28
34
29
- func (q * Query ) Close () error {
30
- err := q .reader .Close ()
31
- q .closer .Close ()
32
- return err
35
+ func (s * scanner ) Progress () zbuf.Progress {
36
+ return s .progress
33
37
}
34
38
35
- func (q * Query ) Read () (* zed.Value , error ) {
36
- val , ctrl , err := q .reader .ReadPayload ()
37
- if ctrl != nil {
38
- if ctrl .Format != zngio .ControlFormatZSON {
39
- return nil , fmt .Errorf ("unsupported app encoding: %v" , ctrl .Format )
40
- }
41
- arena := zed .NewArena ()
42
- defer arena .Unref ()
43
- value , err := zson .ParseValue (zed .NewContext (), arena , string (ctrl .Bytes ))
44
- if err != nil {
45
- return nil , fmt .Errorf ("unable to parse Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
39
+ func (s * scanner ) Pull (done bool ) (zbuf.Batch , error ) {
40
+ again:
41
+ batch , err := s .scanner .Pull (done )
42
+ if err == nil {
43
+ if batch != nil {
44
+ return zbuf .Wrap (s .channel , batch ), nil
46
45
}
47
- var v interface {}
48
- if err := unmarshaler .Unmarshal (value , & v ); err != nil {
49
- return nil , fmt .Errorf ("unable to unmarshal Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
50
- }
51
- return nil , controlToError (v )
46
+ return nil , s .closer .Close ()
52
47
}
53
- return val , err
54
- }
55
-
56
- func controlToError (ctrl interface {}) error {
57
- switch ctrl := ctrl .(type ) {
48
+ zctrl , ok := err .(* zbuf.Control )
49
+ if ! ok {
50
+ return nil , err
51
+ }
52
+ v , err := s .marshalControl (zctrl )
53
+ if err != nil {
54
+ return nil , err
55
+ }
56
+ switch ctrl := v .(type ) {
58
57
case * api.QueryChannelSet :
59
- return & zbuf.Control {Message : zbuf .SetChannel (ctrl .Channel )}
58
+ s .channel = ctrl .Channel
59
+ goto again
60
60
case * api.QueryChannelEnd :
61
- return & zbuf.Control {Message : zbuf .EndChannel (ctrl .Channel )}
61
+ eoc := zbuf .EndOfChannel (ctrl .Channel )
62
+ return & eoc , nil
62
63
case * api.QueryStats :
63
- return & zbuf.Control {Message : ctrl .Progress }
64
+ s .progress .Add (ctrl .Progress )
65
+ goto again
64
66
case * api.QueryError :
65
- return errors .New (ctrl .Error )
67
+ return nil , errors .New (ctrl .Error )
66
68
default :
67
- return fmt .Errorf ("unsupported control message: %T" , ctrl )
69
+ return nil , fmt .Errorf ("unsupported control message: %T" , ctrl )
70
+ }
71
+ }
72
+
73
+ func (s * scanner ) marshalControl (zctrl * zbuf.Control ) (any , error ) {
74
+ ctrl , ok := zctrl .Message .(* zngio.Control )
75
+ if ! ok {
76
+ return nil , fmt .Errorf ("unknown control type: %T" , zctrl .Message )
77
+ }
78
+ if ctrl .Format != zngio .ControlFormatZSON {
79
+ return nil , fmt .Errorf ("unsupported app encoding: %v" , ctrl .Format )
80
+ }
81
+ arena := zed .NewArena ()
82
+ defer arena .Unref ()
83
+ value , err := zson .ParseValue (zed .NewContext (), arena , string (ctrl .Bytes ))
84
+ if err != nil {
85
+ return nil , fmt .Errorf ("unable to parse Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
86
+ }
87
+ var v interface {}
88
+ if err := unmarshaler .Unmarshal (value , & v ); err != nil {
89
+ return nil , fmt .Errorf ("unable to unmarshal Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
68
90
}
91
+ return v , nil
69
92
}
0 commit comments