1
1
package queryio
2
2
3
3
import (
4
+ "context"
4
5
"errors"
5
6
"fmt"
6
7
"io"
@@ -12,58 +13,79 @@ import (
12
13
"github.com/brimdata/zed/zson"
13
14
)
14
15
15
- type Query struct {
16
- reader * zngio.Reader
17
- closer io.Closer
16
+ type scanner struct {
17
+ channel string
18
+ scanner zbuf.Scanner
19
+ closer io.Closer
20
+ progress zbuf.Progress
18
21
}
19
22
20
- // NewQuery returns a Query that reads a ZNG-encoded query response
21
- // from rc and decodes it. Closing the Query also closes rc.
22
- func NewQuery (rc io.ReadCloser ) * Query {
23
- return & Query {
24
- reader : zngio .NewReader (zed .NewContext (), rc ),
25
- closer : rc ,
23
+ func NewScanner (ctx context.Context , rc io.ReadCloser ) (zbuf.Scanner , error ) {
24
+ s , err := zngio .NewReader (zed .NewContext (), rc ).NewScanner (ctx , nil )
25
+ if err != nil {
26
+ return nil , err
26
27
}
28
+ return & scanner {
29
+ scanner : s ,
30
+ closer : rc ,
31
+ }, nil
27
32
}
28
33
29
- func (q * Query ) Close () error {
30
- err := q .reader .Close ()
31
- q .closer .Close ()
32
- return err
34
+ func (s * scanner ) Progress () zbuf.Progress {
35
+ return s .progress
33
36
}
34
37
35
- func (q * Query ) Read () (* zed.Value , error ) {
36
- val , ctrl , err := q .reader .ReadPayload ()
37
- if ctrl != nil {
38
- if ctrl .Format != zngio .ControlFormatZSON {
39
- return nil , fmt .Errorf ("unsupported app encoding: %v" , ctrl .Format )
38
+ func (s * scanner ) Pull (done bool ) (zbuf.Batch , error ) {
39
+ again:
40
+ batch , err := s .scanner .Pull (done )
41
+ if err == nil {
42
+ if batch != nil {
43
+ return zbuf .Label (s .channel , batch ), nil
40
44
}
41
- arena := zed .NewArena ()
42
- defer arena .Unref ()
43
- value , err := zson .ParseValue (zed .NewContext (), arena , string (ctrl .Bytes ))
44
- if err != nil {
45
- return nil , fmt .Errorf ("unable to parse Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
46
- }
47
- var v interface {}
48
- if err := unmarshaler .Unmarshal (value , & v ); err != nil {
49
- return nil , fmt .Errorf ("unable to unmarshal Zed control message: %w (%s)" , err , string (ctrl .Bytes ))
50
- }
51
- return nil , controlToError (v )
45
+ return nil , s .closer .Close ()
52
46
}
53
- return val , err
54
- }
55
-
56
- func controlToError (ctrl interface {}) error {
57
- switch ctrl := ctrl .(type ) {
47
+ zctrl , ok := err .(* zbuf.Control )
48
+ if ! ok {
49
+ return nil , err
50
+ }
51
+ v , err := marshalControl (zctrl )
52
+ if err != nil {
53
+ return nil , err
54
+ }
55
+ switch ctrl := v .(type ) {
58
56
case * api.QueryChannelSet :
59
- return & zbuf.Control {Message : zbuf .SetChannel (ctrl .Channel )}
57
+ s .channel = ctrl .Channel
58
+ goto again
60
59
case * api.QueryChannelEnd :
61
- return & zbuf.Control {Message : zbuf .EndChannel (ctrl .Channel )}
60
+ eoc := zbuf .EndOfChannel (ctrl .Channel )
61
+ return & eoc , nil
62
62
case * api.QueryStats :
63
- return & zbuf.Control {Message : ctrl .Progress }
63
+ s .progress .Add (ctrl .Progress )
64
+ goto again
64
65
case * api.QueryError :
65
- return errors .New (ctrl .Error )
66
+ return nil , errors .New (ctrl .Error )
66
67
default :
67
- return fmt .Errorf ("unsupported control message: %T" , ctrl )
68
+ return nil , fmt .Errorf ("unsupported control message: %T" , ctrl )
69
+ }
70
+ }
71
+
72
+ func marshalControl (zctrl * zbuf.Control ) (any , error ) {
73
+ ctrl , ok := zctrl .Message .(* zngio.Control )
74
+ if ! ok {
75
+ return nil , fmt .Errorf ("unknown control type: %T" , zctrl .Message )
76
+ }
77
+ if ctrl .Format != zngio .ControlFormatZSON {
78
+ return nil , fmt .Errorf ("unsupported app encoding: %v" , ctrl .Format )
79
+ }
80
+ arena := zed .NewArena ()
81
+ defer arena .Unref ()
82
+ value , err := zson .ParseValue (zed .NewContext (), arena , string (ctrl .Bytes ))
83
+ if err != nil {
84
+ return nil , fmt .Errorf ("unable to parse Zed control message: %w (%s)" , err , ctrl .Bytes )
85
+ }
86
+ var v interface {}
87
+ if err := unmarshaler .Unmarshal (value , & v ); err != nil {
88
+ return nil , fmt .Errorf ("unable to unmarshal Zed control message: %w (%s)" , err , ctrl .Bytes )
68
89
}
90
+ return v , nil
69
91
}
0 commit comments