Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store shard metadata in S3, add a tailing facility #5

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions triton/shard_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package triton

type shardRecord struct {
record map[string]interface{}
shard string
sequence string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got types specifically for these to make it less confusing. I think this would fit in better as:

type shardRecord struct {
    ....
    shardID ShardID
    sequenceNumber SequenceNumber
}

Just like in https://github.com/postmates/go-triton/blob/master/triton/stream.go#L21

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I changed these.

}

type shardReader interface {
readShardRecord() (result *shardRecord, err error)
}
98 changes: 82 additions & 16 deletions triton/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package triton

import (
"bytes"
"encoding/json"
"fmt"
"github.com/golang/snappy"
"github.com/skarademir/naturalsort"
"github.com/tinylib/msgp/msgp"
"io"
"log"
"os"
"sort"
"time"

"github.com/golang/snappy"
"github.com/tinylib/msgp/msgp"
)

type CheckpointService interface {
Expand All @@ -18,17 +20,16 @@ type CheckpointService interface {

// A store manages buffering records together into files, and uploading them somewhere.
type Store struct {
name string
reader StreamReader
name string
reader StreamReader
streamMetadata *streamMetadata

// Our uploaders manages sending our datafiles somewhere
uploader *S3Uploader

uploader *S3Uploader
currentLogTime time.Time
currentWriter io.WriteCloser
currentFilename *string

buf *bytes.Buffer
buf *bytes.Buffer
}

func (s *Store) closeWriter() error {
Expand All @@ -48,7 +49,8 @@ func (s *Store) closeWriter() error {
s.currentWriter = nil

if s.uploader != nil {
err = s.uploader.Upload(*s.currentFilename, s.generateKeyname())
keyName := s.generateKeyname()
err = s.uploader.Upload(*s.currentFilename, keyName)
if err != nil {
log.Println("Failed to upload:", err)
return fmt.Errorf("Failed to upload")
Expand All @@ -59,17 +61,32 @@ func (s *Store) closeWriter() error {
log.Println("Failed to cleanup:", err)
return fmt.Errorf("Failed to cleanup writer")
}

err = s.uploadMetadata(keyName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be using the ArchiveRepository Upload stuff?

if err != nil {
return fmt.Errorf("failed to upload metadata: %s", err.Error())
}
}

s.currentFilename = nil

s.reader.Checkpoint()
}
s.streamMetadata = newStreamMetadata()

return nil
}

func (s *Store) uploadMetadata(keyName string) (err error) {
// upload the metadata
var metadataBuf bytes.Buffer
err = json.NewEncoder(&metadataBuf).Encode(&s.streamMetadata)
if err != nil {
err = fmt.Errorf("failed to upload metadata: %s", err.Error())
return
}
s.uploader.UploadBuf(&metadataBuf, keyName+".metadata")
return
}

func (s *Store) openWriter(fname string) (err error) {
if s.currentWriter != nil {
return fmt.Errorf("Existing writer still open")
Expand Down Expand Up @@ -205,11 +222,60 @@ func NewStore(name string, r StreamReader, up *S3Uploader) (s *Store) {
buf := bytes.NewBuffer(b)

s = &Store{
name: name,
reader: r,
buf: buf,
uploader: up,
name: name,
reader: r,
buf: buf,
uploader: up,
streamMetadata: newStreamMetadata(),
}

return
}

type streamMetadata struct {
// shard ID => shardInfo
Shards map[string]*shardInfo `json:"shards"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these actually need to be pointers to shardInfo?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so because they are mutable

}

func newStreamMetadata() *streamMetadata {
return &streamMetadata{
Shards: make(map[string]*shardInfo),
}
}

func (s *streamMetadata) noteSequenceNumber(sequenceNum string, shardID string) {
sh := s.Shards[shardID]
if sh == nil {
sh = &shardInfo{}
s.Shards[shardID] = sh
}
sh.noteSequenceNumber(sequenceNum)
}

type shardInfo struct {
MinSequenceNumber string `json:"min_sequence_number"`
MaxSequenceNumber string `json:"max_sequence_number"`
}

func (s *shardInfo) noteSequenceNumber(sequenceNum string) {
if s.MinSequenceNumber == "" {
s.MinSequenceNumber = sequenceNum
} else {
nums := naturalsort.NaturalSort([]string{
sequenceNum,
s.MinSequenceNumber,
})
sort.Sort(nums)
s.MinSequenceNumber = nums[0]
}
if s.MaxSequenceNumber == "" {
s.MaxSequenceNumber = sequenceNum
} else {
nums := naturalsort.NaturalSort([]string{
sequenceNum,
s.MaxSequenceNumber,
})
sort.Sort(nums)
s.MaxSequenceNumber = nums[1]
}
}
15 changes: 15 additions & 0 deletions triton/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,18 @@ func TestPut(t *testing.T) {
}
}
}

func TestShardInfo(t *testing.T) {
si := &shardInfo{}
si.noteSequenceNumber("12345")
si.noteSequenceNumber("01234")
si.noteSequenceNumber("11")

if si.MinSequenceNumber != "11" {
t.Fatalf("expecting the min sequence number to be 11 but got %q", si.MinSequenceNumber)
}
if si.MaxSequenceNumber != "12345" {
t.Fatalf("expecting the max sequence number to be 12345 but got %q", si.MaxSequenceNumber)
}

}
37 changes: 24 additions & 13 deletions triton/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type StreamReader interface {
type multiShardStreamReader struct {
checkpointer Checkpointer
readers []*ShardStreamReader
recStream chan map[string]interface{}
recStream chan *shardRecord
allWg sync.WaitGroup
done chan struct{}
quit chan struct{}
Expand All @@ -37,13 +37,19 @@ func (msr *multiShardStreamReader) Checkpoint() (err error) {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like if you're going to have the shard sequence number state managed outside the ShardStream itself, maybe it should ONLY be managed/tracked there?

It'll probably be confusing if there are two places the sequence numbers are recorded right?

I guess the other approach would be to use this same technique used here rather than storing the sequence numbers on a per-record basis?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following your drift

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh because the ShardStreamReader struct also has a LastSequenceNumber. Yeah, I think they are a little redundant.

I think it would be good to change the triton service interface to only expose a client side reader and the aggregation service and then make everything else private. I didn't want to remove it because I wasn't sure what part of the public API is exported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it looks like ShardStreamReader uses LastSequenceNumber to initialize the iteration position

}

func (msr *multiShardStreamReader) ReadRecord() (rec map[string]interface{}, err error) {
func (msr *multiShardStreamReader) ReadRecord() (result map[string]interface{}, err error) {
shardRecord, err := msr.readShardRecord()
result = shardRecord.record
return
}

func (msr *multiShardStreamReader) readShardRecord() (result *shardRecord, err error) {
select {
case rec = <-msr.recStream:
return rec, nil
case result = <-msr.recStream:
case <-msr.done:
return nil, io.EOF
err = io.EOF
}
return
}

func (msr *multiShardStreamReader) Stop() {
Expand All @@ -56,12 +62,12 @@ const maxShards int = 100

func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr StreamReader, err error) {
msr := multiShardStreamReader{
c,
make([]*ShardStreamReader, 0),
make(chan map[string]interface{}),
sync.WaitGroup{},
make(chan struct{}),
make(chan struct{}, maxShards),
checkpointer: c,
readers: make([]*ShardStreamReader, 0),
recStream: make(chan *shardRecord),
allWg: sync.WaitGroup{},
done: make(chan struct{}),
quit: make(chan struct{}, maxShards),
}

shards, err := ListShards(svc, streamName)
Expand Down Expand Up @@ -118,7 +124,7 @@ func NewStreamReader(svc KinesisService, streamName string, c Checkpointer) (sr
return
}

func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface{}, done chan struct{}) {
func processStreamToChan(r *ShardStreamReader, recChan chan *shardRecord, done chan struct{}) {
for {
select {
case <-done:
Expand Down Expand Up @@ -150,9 +156,14 @@ func processStreamToChan(r *ShardStreamReader, recChan chan map[string]interface
log.Println("Extra bytes in stream record", len(eb))
return
}
shardRec := &shardRecord{
record: rec,
shard: string(r.ShardID),
sequence: *kRec.SequenceNumber,
}

select {
case recChan <- rec:
case recChan <- shardRec:
case <-done:
return
}
Expand Down
22 changes: 22 additions & 0 deletions triton/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package triton

import (
"fmt"
"io"
"log"
"os"

Expand All @@ -27,6 +28,7 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) {
}

log.Println("Uploading", fileName)

ui := s3manager.UploadInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(keyName),
Expand All @@ -45,6 +47,26 @@ func (s *S3Uploader) Upload(fileName, keyName string) (err error) {
return
}

func (s *S3Uploader) UploadBuf(r io.Reader, keyName string) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name doesn't seem exactly right... the first argument is a Reader not a buffer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to UploadData()

log.Println("Uploading", keyName)

ui := s3manager.UploadInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(keyName),
Body: r,
}

_, err = s.uploader.Upload(&ui)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("Failed to upload: %v (%v)", awsErr.Code(), awsErr.Message())
}
return
} else {
log.Println("Completed upload to", keyName)
}
return
}
func NewUploader(c client.ConfigProvider, bucketName string) *S3Uploader {
m := s3manager.NewUploader(c)

Expand Down