Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Writer & BatchWriter #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ Triton is an opinionated set of tooling for building a data pipeline around an
AWS stack including [Kinesis](http://aws.amazon.com/kinesis/) and S3.

It provides the necessary glue for building real applications on top of the
type of architecture.
type of architecture.

## Overview ##

As your application collects data, write it to Kinesis streams as a series of
events. Other applications in your infrastructure read from this stream
providing a solid pattern for services to share data.
providing a solid pattern for services to share data.

Triton aims to provide a level of tooling, glue and utility to make this
ecosystem easy to use. Namely:
Expand Down Expand Up @@ -200,4 +200,4 @@ Standard go build commands of course also work.

* Metrics/Reporting hooks for easier status checks
* Better handle Kinesis shard splitting and combining
* Better patterns for dealing with arbitrary `map[string]interface{}` data
* Better patterns for dealing with arbitrary `Record` data
2 changes: 1 addition & 1 deletion triton/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type StoreArchive struct {
rdr Reader
}

func (sa *StoreArchive) ReadRecord() (rec map[string]interface{}, err error) {
func (sa *StoreArchive) ReadRecord() (rec Record, err error) {
if sa.rdr == nil {
out, err := sa.s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(sa.Bucket),
Expand Down
4 changes: 2 additions & 2 deletions triton/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type ArchiveReader struct {
mr *msgp.Reader
}

func (r *ArchiveReader) ReadRecord() (rec map[string]interface{}, err error) {
rec = make(map[string]interface{})
func (r *ArchiveReader) ReadRecord() (rec Record, err error) {
rec = make(Record)

err = r.mr.ReadMapStrIntf(rec)
return
Expand Down
1 change: 1 addition & 0 deletions triton/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type KinesisService interface {
DescribeStream(*kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error)
GetShardIterator(*kinesis.GetShardIteratorInput) (*kinesis.GetShardIteratorOutput, error)
GetRecords(*kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error)
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

type S3Service interface {
Expand Down
4 changes: 2 additions & 2 deletions triton/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Reader interface {
ReadRecord() (rec map[string]interface{}, err error)
ReadRecord() (rec Record, err error)
}

// A SerialReader let's us read from multiple readers, in sequence
Expand All @@ -15,7 +15,7 @@ type SerialReader struct {
r_idx int
}

func (sr *SerialReader) ReadRecord() (rec map[string]interface{}, err error) {
func (sr *SerialReader) ReadRecord() (rec Record, err error) {
for sr.r_idx < len(sr.readers) {
rec, err = sr.readers[sr.r_idx].ReadRecord()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion triton/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestSerialReaderEmtpy(t *testing.T) {

type instantEOFReader struct{}

func (sr *instantEOFReader) ReadRecord() (rec map[string]interface{}, err error) {
func (sr *instantEOFReader) ReadRecord() (rec Record, err error) {
return nil, io.EOF
}

Expand Down
2 changes: 1 addition & 1 deletion triton/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *Store) flushBuffer() (err error) {
return
}

func (s *Store) PutRecord(rec map[string]interface{}) (err error) {
func (s *Store) PutRecord(rec Record) (err error) {
// TODO: Looks re-usable
b := make([]byte, 0, 1024)
b, err = msgp.AppendMapStrIntf(b, rec)
Expand Down
2 changes: 1 addition & 1 deletion triton/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type nullStreamReader struct{}

func (nsr *nullStreamReader) ReadRecord() (map[string]interface{}, error) {
func (nsr *nullStreamReader) ReadRecord() (Record, error) {
return nil, io.EOF
}

Expand Down
8 changes: 4 additions & 4 deletions triton/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type StreamReader interface {
type multiShardStreamReader struct {
checkpointer Checkpointer
readers []*ShardStreamReader
recStream chan map[string]interface{}
recStream chan Record
allWg sync.WaitGroup
done chan struct{}
quit chan struct{}
Expand All @@ -37,7 +37,7 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) {
return
}

func (msr *multiShardStreamReader) ReadRecord() (rec map[string]interface{}, err error) {
func (msr *multiShardStreamReader) ReadRecord() (rec Record, err error) {
select {
case rec = <-msr.recStream:
return rec, nil
Expand All @@ -58,7 +58,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr
msr := multiShardStreamReader{
c,
make([]*ShardStreamReader, 0),
make(chan map[string]interface{}),
make(chan Record),
sync.WaitGroup{},
make(chan struct{}),
make(chan struct{}, maxShards),
Expand Down Expand Up @@ -118,7 +118,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr
return
}

func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface{}, done chan struct{}) {
func processStreamToChan(r *ShardStreamReader, recChan chan Record, done chan struct{}) {
for {
select {
case <-done:
Expand Down
4 changes: 2 additions & 2 deletions triton/stream_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ func TestNewStreamReader(t *testing.T) {
st := newTestKinesisStream("test-stream")
s1 := newTestKinesisShard()

r1 := make(map[string]interface{})
r1 := make(Record)
r1["value"] = "a"
s1.AddRecord(SequenceNumber("a"), r1)
st.AddShard(ShardID("0"), s1)

s2 := newTestKinesisShard()
r2 := make(map[string]interface{})
r2 := make(Record)
r2["value"] = "b"
s2.AddRecord(SequenceNumber("b"), r2)
st.AddShard(ShardID("1"), s2)
Expand Down
18 changes: 16 additions & 2 deletions triton/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func (s *NullKinesisService) DescribeStream(input *kinesis.DescribeStreamInput)
return nil, fmt.Errorf("Not Implemented")
}

func (s *NullKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
results := []*kinesis.PutRecordsResultEntry{}
gso := &kinesis.PutRecordsOutput{
Records: results,
FailedRecordCount: aws.Int64(0),
}
return gso, nil
}

type FailingKinesisService struct{}

func (s *FailingKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
Expand All @@ -47,6 +56,11 @@ func (s *FailingKinesisService) GetShardIterator(*kinesis.GetShardIteratorInput)
return gso, nil
}

func (s *FailingKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
err := awserr.New("ProvisionedThroughputExceededException", "slow down dummy", fmt.Errorf("error"))
return nil, err
}

func TestNewShardStreamReader(t *testing.T) {
svc := NullKinesisService{}

Expand Down Expand Up @@ -107,7 +121,7 @@ func TestFetchMoreRecords(t *testing.T) {
st := newTestKinesisStream("test-stream")
s1 := newTestKinesisShard()

r1 := make(map[string]interface{})
r1 := make(Record)
s1.AddRecord(SequenceNumber("a"), r1)
st.AddShard("shard-0000", s1)
svc.AddStream(st)
Expand Down Expand Up @@ -150,7 +164,7 @@ func TestRead(t *testing.T) {
st := newTestKinesisStream("test-stream")
s1 := newTestKinesisShard()

r1 := make(map[string]interface{})
r1 := make(Record)
s1.AddRecord(SequenceNumber("a"), r1)
st.AddShard("shard-0000", s1)
svc.AddStream(st)
Expand Down
68 changes: 63 additions & 5 deletions triton/test_util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Mock Kinesis Service
package triton

// Mock Kinesis Service

import (
"bytes"
"fmt"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
Expand All @@ -21,18 +23,44 @@ type testKinesisShard struct {
records []testKinesisRecords
}

func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec map[string]interface{}) {
func (s *testKinesisShard) AddRecord(sn SequenceNumber, rec Record) {
b := bytes.NewBuffer(make([]byte, 0, 1024))
w := msgp.NewWriter(b)
err := w.WriteMapStrIntf(rec)
if err != nil {
panic(err)
}
w.Flush()
rs := testKinesisRecords{sn, [][]byte{b.Bytes()}}
s.AddData(sn, b.Bytes())
}

func (s *testKinesisShard) AddData(sn SequenceNumber, data []byte) {
rs := testKinesisRecords{sn, [][]byte{data}}
s.records = append(s.records, rs)
}

func (s *testKinesisShard) PopData() (SequenceNumber, []byte) {
r := s.records[0]
s.records = s.records[1:]
return r.sn, r.recordData[0]
}

func (s *testKinesisShard) PopRecord() (SequenceNumber, Record) {
sn, data := s.PopData()

b := bytes.NewBuffer(data)
r := make(Record)

w := msgp.NewReader(b)
w.ReadMapStrIntf(r)

return sn, r
}

func (s *testKinesisShard) NextSequenceNumber() SequenceNumber {
return SequenceNumber(time.Now().String())
}

func newTestKinesisShard() *testKinesisShard {
return &testKinesisShard{make([]testKinesisRecords, 0)}
}
Expand Down Expand Up @@ -117,15 +145,15 @@ func (s *testKinesisService) GetRecords(gri *kinesis.GetRecordsInput) (*kinesis.
}

func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
shards := make([]*kinesis.Shard, 0)

stream, ok := s.streams[*input.StreamName]
if !ok {
// TODO: Probably a real error condition we could simulate
return nil, fmt.Errorf("Failed to find stream")
}

for sid, _ := range stream.shards {
var shards []*kinesis.Shard
for sid := range stream.shards {
shards = append(shards, &kinesis.Shard{ShardId: aws.String(string(sid))})
}

Expand All @@ -140,3 +168,33 @@ func (s *testKinesisService) DescribeStream(input *kinesis.DescribeStreamInput)

return dso, nil
}

func (s *testKinesisService) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
stream, ok := s.streams[*input.StreamName]
if !ok {
return nil, fmt.Errorf("Failed to find stream")
}

records := make([]*kinesis.PutRecordsResultEntry, len(input.Records))
for i, r := range input.Records {
shard, ok := stream.shards[ShardID(*r.PartitionKey)]
if !ok {
return nil, fmt.Errorf("Failed to find shard")
}

sn := shard.NextSequenceNumber()
shard.AddData(sn, r.Data)

records[i] = &kinesis.PutRecordsResultEntry{
SequenceNumber: aws.String(string(sn)),
ShardId: r.PartitionKey,
}
}

output := &kinesis.PutRecordsOutput{
Records: records,
FailedRecordCount: aws.Int64(0),
}

return output, nil
}
4 changes: 4 additions & 0 deletions triton/triton.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package triton

// Record is the unit of data that is passed through Triton.
type Record map[string]interface{}
Loading