Skip to content

Commit 80ccdd1

Browse files
Implemented writer and batch writer; formalized record type
1 parent 431db91 commit 80ccdd1

17 files changed

+503
-24
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ Triton is an opinionated set of tooling for building a data pipeline around an
44
AWS stack including [Kinesis](http://aws.amazon.com/kinesis/) and S3.
55

66
It provides the necessary glue for building real applications on top of the
7-
type of architecture.
7+
type of architecture.
88

99
## Overview ##
1010

1111
As your application collects data, write it to Kinesis streams as a series of
1212
events. Other applications in your infrastructure read from this stream
13-
providing a solid pattern for services to share data.
13+
providing a solid pattern for services to share data.
1414

1515
Triton aims to provide a level of tooling, glue and utility to make this
1616
ecosystem easy to use. Namely:
@@ -200,4 +200,4 @@ Standard go build commands of course also work.
200200

201201
* Metrics/Reporting hooks for easier status checks
202202
* Better handle Kinesis shard splitting and combining
203-
* Better patterns for dealing with arbitrary `map[string]interface{}` data
203+
* Better patterns for dealing with arbitrary `Record` data

triton/archive.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type StoreArchive struct {
2424
rdr Reader
2525
}
2626

27-
func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) {
27+
func (sa *StoreArchive) ReadRecord() (rec Record, err error) {
2828
if sa.rdr == nil {
2929
out, err := sa.s3Svc.GetObject(&s3.GetObjectInput{
3030
Bucket: aws.String(sa.Bucket),

triton/archive_reader.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ type ArchiveReader struct {
1313
mr *msgp.Reader
1414
}
1515

16-
func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error) {
17-
rec = make(map[string]interface{})
16+
func (r *ArchiveReader) ReadRecord() (rec Record, err error) {
17+
rec = make(Record)
1818

1919
err = r.mr.ReadMapStrIntf(rec)
2020
return

triton/aws.go

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type KinesisService interface {
1717
DescribeStream(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error)
1818
GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error)
1919
GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error)
20+
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
2021
}
2122

2223
type S3Service interface {

triton/reader.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
type Reader interface {
9-
ReadRecord() (rec map[string]interface{}, err error)
9+
ReadRecord() (rec Record, err error)
1010
}
1111

1212
// A SerialReader let's us read from multiple readers, in sequence
@@ -15,7 +15,7 @@ type SerialReader struct {
1515
r_idx int
1616
}
1717

18-
func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error) {
18+
func (sr *SerialReader) ReadRecord() (rec Record, err error) {
1919
for sr.r_idx < len(sr.readers) {
2020
rec, err = sr.readers[sr.r_idx].ReadRecord()
2121
if err != nil {

triton/reader_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestSerialReaderEmtpy(t *testing.T) {
2323

2424
type instantEOFReader struct{}
2525

26-
func (sr *instantEOFReader) ReadRecord() (rec map[string]interface{}, err error) {
26+
func (sr *instantEOFReader) ReadRecord() (rec Record, err error) {
2727
return nil, io.EOF
2828
}
2929

triton/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (s *Store) flushBuffer() (err error) {
141141
return
142142
}
143143

144-
func (s *Store) PutRecord(rec map[string]interface{}) (err error) {
144+
func (s *Store) PutRecord(rec Record) (err error) {
145145
// TODO: Looks re-usable
146146
b := make([]byte, 0, 1024)
147147
b, err = msgp.AppendMapStrIntf(b, rec)

triton/store_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
type nullStreamReader struct{}
1515

16-
func (nsr *nullStreamReader) ReadRecord() (map[string]interface{}, error) {
16+
func (nsr *nullStreamReader) ReadRecord() (Record, error) {
1717
return nil, io.EOF
1818
}
1919

triton/stream_reader.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type StreamReader interface {
2222
type multiShardStreamReader struct {
2323
checkpointer Checkpointer
2424
readers []*ShardStreamReader
25-
recStream chan map[string]interface{}
25+
recStream chan Record
2626
allWg sync.WaitGroup
2727
done chan struct{}
2828
quit chan struct{}
@@ -37,7 +37,7 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) {
3737
return
3838
}
3939

40-
func (msr *multiShardStreamReader) ReadRecord() (rec map[string]interface{}, err error) {
40+
func (msr *multiShardStreamReader) ReadRecord() (rec Record, err error) {
4141
select {
4242
case rec = <-msr.recStream:
4343
return rec, nil
@@ -58,7 +58,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr
5858
msr := multiShardStreamReader{
5959
c,
6060
make([]*ShardStreamReader, 0),
61-
make(chan map[string]interface{}),
61+
make(chan Record),
6262
sync.WaitGroup{},
6363
make(chan struct{}),
6464
make(chan struct{}, maxShards),
@@ -118,7 +118,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr
118118
return
119119
}
120120

121-
func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface{}, done chan struct{}) {
121+
func processStreamToChan(r *ShardStreamReader, recChan chan Record, done chan struct{}) {
122122
for {
123123
select {
124124
case <-done:

triton/stream_reader_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ func TestNewStreamReader(t *testing.T) {
77
st := newTestKinesisStream("test-stream")
88
s1 := newTestKinesisShard()
99

10-
r1 := make(map[string]interface{})
10+
r1 := make(Record)
1111
r1["value"] = "a"
1212
s1.AddRecord(SequenceNumber("a"), r1)
1313
st.AddShard(ShardID("0"), s1)
1414

1515
s2 := newTestKinesisShard()
16-
r2 := make(map[string]interface{})
16+
r2 := make(Record)
1717
r2["value"] = "b"
1818
s2.AddRecord(SequenceNumber("b"), r2)
1919
st.AddShard(ShardID("1"), s2)

triton/stream_test.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ func (s *NullKinesisService) DescribeStream(input *kinesis.DescribeStreamInput)
3131
return nil, fmt.Errorf("Not Implemented")
3232
}
3333

34+
func (s *NullKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
35+
results := []*kinesis.PutRecordsResultEntry{}
36+
gso := &kinesis.PutRecordsOutput{
37+
Records: results,
38+
FailedRecordCount: aws.Int64(0),
39+
}
40+
return gso, nil
41+
}
42+
3443
type FailingKinesisService struct{}
3544

3645
func (s *FailingKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
@@ -47,6 +56,11 @@ func (s *FailingKinesisService) GetShardIterator(*kinesis.GetShardIteratorInput)
4756
return gso, nil
4857
}
4958

59+
func (s *FailingKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
60+
err := awserr.New("ProvisionedThroughputExceededException", "slow down dummy", fmt.Errorf("error"))
61+
return nil, err
62+
}
63+
5064
func TestNewShardStreamReader(t *testing.T) {
5165
svc := NullKinesisService{}
5266

@@ -107,7 +121,7 @@ func TestFetchMoreRecords(t *testing.T) {
107121
st := newTestKinesisStream("test-stream")
108122
s1 := newTestKinesisShard()
109123

110-
r1 := make(map[string]interface{})
124+
r1 := make(Record)
111125
s1.AddRecord(SequenceNumber("a"), r1)
112126
st.AddShard("shard-0000", s1)
113127
svc.AddStream(st)
@@ -150,7 +164,7 @@ func TestRead(t *testing.T) {
150164
st := newTestKinesisStream("test-stream")
151165
s1 := newTestKinesisShard()
152166

153-
r1 := make(map[string]interface{})
167+
r1 := make(Record)
154168
s1.AddRecord(SequenceNumber("a"), r1)
155169
st.AddShard("shard-0000", s1)
156170
svc.AddStream(st)

triton/test_util.go

+63-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
// Mock Kinesis Service
21
package triton
32

3+
// Mock Kinesis Service
4+
45
import (
56
"bytes"
67
"fmt"
78
"log"
89
"strings"
10+
"time"
911

1012
"github.com/aws/aws-sdk-go/aws"
1113
"github.com/aws/aws-sdk-go/service/kinesis"
@@ -21,18 +23,44 @@ type testKinesisShard struct {
2123
records []testKinesisRecords
2224
}
2325

24-
func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec map[string]interface{}) {
26+
func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec Record) {
2527
b := bytes.NewBuffer(make([]byte, 0, 1024))
2628
w := msgp.NewWriter(b)
2729
err := w.WriteMapStrIntf(rec)
2830
if err != nil {
2931
panic(err)
3032
}
3133
w.Flush()
32-
rs := testKinesisRecords{sn, [][]byte{b.Bytes()}}
34+
s.AddData(sn, b.Bytes())
35+
}
36+
37+
func (s *testKinesisShard) AddData(sn SequenceNumber, data []byte) {
38+
rs := testKinesisRecords{sn, [][]byte{data}}
3339
s.records = append(s.records, rs)
3440
}
3541

42+
func (s *testKinesisShard) PopData() (SequenceNumber, []byte) {
43+
r := s.records[0]
44+
s.records = s.records[1:]
45+
return r.sn, r.recordData[0]
46+
}
47+
48+
func (s *testKinesisShard) PopRecord() (SequenceNumber, Record) {
49+
sn, data := s.PopData()
50+
51+
b := bytes.NewBuffer(data)
52+
r := make(Record)
53+
54+
w := msgp.NewReader(b)
55+
w.ReadMapStrIntf(r)
56+
57+
return sn, r
58+
}
59+
60+
func (s *testKinesisShard) NextSequenceNumber() SequenceNumber {
61+
return SequenceNumber(time.Now().String())
62+
}
63+
3664
func newTestKinesisShard() *testKinesisShard {
3765
return &testKinesisShard{make([]testKinesisRecords, 0)}
3866
}
@@ -117,15 +145,15 @@ func (s *testKinesisService) GetRecords(gri *kinesis.GetRecordsInput) (*kinesis.
117145
}
118146

119147
func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
120-
shards := make([]*kinesis.Shard, 0)
121148

122149
stream, ok := s.streams[*input.StreamName]
123150
if !ok {
124151
// TODO: Probably a real error condition we could simulate
125152
return nil, fmt.Errorf("Failed to find stream")
126153
}
127154

128-
for sid, _ := range stream.shards {
155+
var shards []*kinesis.Shard
156+
for sid := range stream.shards {
129157
shards = append(shards, &kinesis.Shard{ShardId: aws.String(string(sid))})
130158
}
131159

@@ -140,3 +168,33 @@ func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput)
140168

141169
return dso, nil
142170
}
171+
172+
func (s *testKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
173+
stream, ok := s.streams[*input.StreamName]
174+
if !ok {
175+
return nil, fmt.Errorf("Failed to find stream")
176+
}
177+
178+
records := make([]*kinesis.PutRecordsResultEntry, len(input.Records))
179+
for i, r := range input.Records {
180+
shard, ok := stream.shards[ShardID(*r.PartitionKey)]
181+
if !ok {
182+
return nil, fmt.Errorf("Failed to find shard")
183+
}
184+
185+
sn := shard.NextSequenceNumber()
186+
shard.AddData(sn, r.Data)
187+
188+
records[i] = &kinesis.PutRecordsResultEntry{
189+
SequenceNumber: aws.String(string(sn)),
190+
ShardId: r.PartitionKey,
191+
}
192+
}
193+
194+
output := &kinesis.PutRecordsOutput{
195+
Records: records,
196+
FailedRecordCount: aws.Int64(0),
197+
}
198+
199+
return output, nil
200+
}

triton/triton.go

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package triton
2+
3+
// Record is the unit of data that is passed through Triton.
4+
type Record map[string]interface{}

triton/write_batch_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package triton
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestBatchWriterSizeExceeded(t *testing.T) {
10+
configString := bytes.NewBufferString(`
11+
my_stream:
12+
name: test-stream
13+
partition_key: value
14+
region: us-west-1
15+
`)
16+
c, _ := NewConfigFromFile(configString)
17+
config, _ := c.ConfigForName("my_stream")
18+
19+
svc := newTestKinesisService()
20+
st := newTestKinesisStream(config.StreamName)
21+
s1 := newTestKinesisShard()
22+
st.AddShard(ShardID("test-value"), s1)
23+
svc.AddStream(st)
24+
25+
w := NewTestWriter(config, svc, 1)
26+
bw := NewBatchWriterSize(w, 2, 24*time.Hour)
27+
28+
r := Record(map[string]interface{}{"value": "test-value"})
29+
bw.WriteRecords(r)
30+
31+
// Ensure this was not written
32+
if len(s1.records) != 0 {
33+
t.Fatal("Batcher did not wait for size to be exceeded")
34+
}
35+
36+
bw.WriteRecords(r)
37+
38+
// wait for write -- this is technically a race condition
39+
time.Sleep(10 * time.Millisecond)
40+
41+
if len(s1.records) != 2 {
42+
t.Fatal("Batcher did not write when size exceeded")
43+
}
44+
}
45+
46+
func TestBatchWriterTimeExceeded(t *testing.T) {
47+
configString := bytes.NewBufferString(`
48+
my_stream:
49+
name: test-stream
50+
partition_key: value
51+
region: us-west-1
52+
`)
53+
c, _ := NewConfigFromFile(configString)
54+
config, _ := c.ConfigForName("my_stream")
55+
56+
svc := newTestKinesisService()
57+
st := newTestKinesisStream(config.StreamName)
58+
s1 := newTestKinesisShard()
59+
st.AddShard(ShardID("test-value"), s1)
60+
svc.AddStream(st)
61+
62+
w := NewTestWriter(config, svc, 1)
63+
bw := NewBatchWriterSize(w, 1000, 1*time.Millisecond)
64+
65+
r := Record(map[string]interface{}{"value": "test-value"})
66+
bw.WriteRecords(r)
67+
68+
// Ensure this was not written
69+
if len(s1.records) != 0 {
70+
t.Fatal("Batcher did not wait for time to be exceeded")
71+
}
72+
73+
// wait for write -- this is technically a race condition
74+
time.Sleep(10 * time.Millisecond)
75+
76+
if len(s1.records) != 1 {
77+
t.Fatal("Batcher did not write when time exceeded")
78+
}
79+
}

0 commit comments

Comments
 (0)