-
Notifications
You must be signed in to change notification settings - Fork 5
/
freezer_source.go
125 lines (110 loc) · 2.61 KB
/
freezer_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package freezer
import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"time"
"github.com/uw-labs/straw"
)
type ConsumerMessageHandler func([]byte) error
type MessageSource struct {
streamstore straw.StreamStore
path string
pollPeriod time.Duration
}
type MessageSourceConfig struct {
Path string
PollPeriod time.Duration
CompressionType CompressionType
}
func NewMessageSource(streamstore straw.StreamStore, config MessageSourceConfig) *MessageSource {
switch config.CompressionType {
case CompressionTypeNone:
case CompressionTypeSnappy:
streamstore = newSnappyStreamStore(streamstore)
case CompressionTypeZstd:
streamstore = newZstdStreamStore(streamstore)
}
ms := &MessageSource{
streamstore: streamstore,
path: config.Path,
pollPeriod: config.PollPeriod,
}
if ms.pollPeriod == 0 {
ms.pollPeriod = 5 * time.Second
}
return ms
}
func (mq *MessageSource) ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error {
var err error
var rc io.ReadCloser
defer func() {
if rc != nil {
rc.Close()
}
}()
lenBytes := []byte{0, 0, 0, 0}
for seq := 0; ; seq++ {
fullname := seqToPath(mq.path, seq)
waitLoop:
for {
rc, err = mq.streamstore.OpenReadCloser(fullname)
if err == nil {
break waitLoop
}
if !os.IsNotExist(err) {
return err
}
t := time.NewTimer(mq.pollPeriod)
select {
case <-ctx.Done():
t.Stop()
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
return nil
}
return ctx.Err()
case <-t.C:
}
}
readLoop:
for {
_, err := io.ReadFull(rc, lenBytes[:])
if err != nil {
if err == io.EOF {
// file is likely still being written to, sleep and retry.
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
return nil
}
return ctx.Err()
case <-time.After(mq.pollPeriod):
continue readLoop
}
}
return fmt.Errorf("Could not read length (%v)", err)
}
len := int(binary.LittleEndian.Uint32(lenBytes[:]))
if len == 0 {
// next read should be EOF
buf := []byte{0}
if _, err := rc.Read(buf); err != io.EOF {
return fmt.Errorf("Was able to read past end marker. This is broken, bailing out.")
}
break readLoop
}
buf := make([]byte, len)
if _, err := io.ReadFull(rc, buf); err != nil {
return fmt.Errorf("Could not read payload from %v. Expected len was %d. (%v)", fullname, len, err)
}
if err := handler(buf); err != nil {
return err
}
}
if err := rc.Close(); err != nil {
return err
}
}
}