Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Amazon kinesis producer [![Build status][travis-image]][travis-url] [![License][license-image]][license-url] [![GoDoc][godoc-img]][godoc-url]
> A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
> A KPL-like batch producer for Amazon Kinesis built on top of the official AWS SDK for Go V2
and using the same aggregation format that [KPL][kpl-url] use.

### Useful links
Expand All @@ -13,21 +13,33 @@ and using the same aggregation format that [KPL][kpl-url] use.
package main

import (
"context"
"net/http"
"time"

"github.com/sirupsen/logrus"
"github.com/a8m/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
log "github.com/sirupsen/logrus"
)

func main() {
client := kinesis.New(session.New(aws.NewConfig()))
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 20
transport.MaxIdleConnsPerHost = 20
httpClient := &http.Client{
Transport: transport,
}
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-west-2"), config.WithHTTPClient(httpClient))
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}
client := kinesis.NewFromConfig(cfg)
pr := producer.New(&producer.Config{
StreamName: "test",
StreamName: aws.String("test"),
BacklogCount: 2000,
Client: client
Client: client,
})

pr.Start()
Expand Down Expand Up @@ -62,7 +74,7 @@ func main() {
customLogger := &CustomLogger{}

&producer.Config{
StreamName: "test",
StreamName: aws.String("test"),
BacklogCount: 2000,
Client: client,
Logger: customLogger,
Expand All @@ -81,7 +93,7 @@ import (
log := logrus.New()

&producer.Config{
StreamName: "test",
StreamName: aws.String("test"),
BacklogCount: 2000,
Client: client,
Logger: loggers.Logrus(log),
Expand Down
36 changes: 34 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"log"
"os"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
k "github.com/aws/aws-sdk-go-v2/service/kinesis"
)

Expand All @@ -30,8 +33,12 @@ type Putter interface {

// Config is the Producer configuration.
type Config struct {

// StreamARN is the ARN of the stream.
StreamARN *string

// StreamName is the Kinesis stream.
StreamName string
StreamName *string

// FlushInterval is a regular interval for flushing the buffer. Defaults to 5s.
FlushInterval time.Duration
Expand Down Expand Up @@ -98,11 +105,36 @@ func (c *Config) defaults() {
if c.FlushInterval == 0 {
c.FlushInterval = defaultFlushInterval
}
falseOrPanic(len(c.StreamName) == 0, "kinesis: StreamName length must be at least 1")
if c.StreamARN != nil {
// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#Streams-PutRecords-request-StreamARN
if !matchRegex(`^arn:aws.*:kinesis:.*:\d{12}:stream/\S+$`, aws.ToString(c.StreamARN)) {
panic(`kinesis: StreamARN must match pattern "arn:aws.*:kinesis:.*:\d{12}:stream/\S+"` + " current StreamARN: " + aws.ToString(c.StreamARN))
}
if c.StreamName != nil && !strings.HasSuffix(aws.ToString(c.StreamARN), "/"+aws.ToString(c.StreamName)) {
panic(`kinesis: StreamName must match the StreamArn` + " current StreamARN: " + aws.ToString(c.StreamARN) + " current StreamName: " + aws.ToString(c.StreamName))
}
} else if c.StreamName != nil {
// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#Streams-PutRecords-request-StreamName
if !matchRegex(`^[a-zA-Z0-9_.-]+$`, aws.ToString(c.StreamName)) {
panic(`kinesis: StreamName must match pattern "[a-zA-Z0-9_.-]+"` + " current StreamName: " + aws.ToString(c.StreamName))
}
} else {
// https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#API_PutRecords
panic("kinesis: either StreamARN or StreamName must be set, recommended use StreamARN")
}
}

func falseOrPanic(p bool, msg string) {
if p {
panic(msg)
}
}

func matchRegex(pattern string, str string) bool {
re, err := regexp.Compile(pattern)
if err != nil {
return false
}
match := re.MatchString(str)
return match
}
91 changes: 91 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package producer

import (
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
)

func TestConfig_ValidStreamArn(t *testing.T) {
c := &Config{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:012345678901:stream/test-stream-name"),
}
c.defaults()
t.Logf("TestConfig_ValidStreamArn success")
}

func TestConfig_InvalidStreamArn(t *testing.T) {
c := &Config{
StreamARN: aws.String("test-stream-name"),
}
defer func() {
r := recover()
if r != `kinesis: StreamARN must match pattern "arn:aws.*:kinesis:.*:\d{12}:stream/\S+"`+" current StreamARN: test-stream-name" {
t.Errorf("unexpected panic: %v", r)
} else {
t.Logf("TestConfig_InvalidStreamArn success")
}
}()
c.defaults()
}

func TestConfig_ValidStreamName(t *testing.T) {
c := &Config{
StreamName: aws.String("test-stream-name"),
}
c.defaults()
t.Logf("TestConfig_ValidStreamName success")
}

func TestConfig_InvalidStreamName(t *testing.T) {
c := &Config{
StreamName: aws.String("test`-stream/name"),
}
defer func() {
r := recover()
if r != `kinesis: StreamName must match pattern "[a-zA-Z0-9_.-]+"`+" current StreamName: test`-stream/name" {
t.Errorf("unexpected panic: %v", r)
} else {
t.Logf("TestConfig_InvalidStreamName success")
}
}()
c.defaults()
}

func TestConfig_BothStreamArnAndName(t *testing.T) {
c := &Config{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:012345678901:stream/test-stream-name"),
StreamName: aws.String("test-stream-name"),
}
c.defaults()
t.Logf("TestConfig_BothStreamArnAndName success")
}

func TestConfig_EmptyStreamArnAndName(t *testing.T) {
c := &Config{}
defer func() {
r := recover()
if r != `kinesis: either StreamARN or StreamName must be set, recommended use StreamARN` {
t.Errorf("unexpected panic: %v", r)
} else {
t.Logf("TestConfig_EmptyStreamArnAndName success")
}
}()
c.defaults()
}

func TestConfig_NoMatchStreamArnAndName(t *testing.T) {
c := &Config{
StreamARN: aws.String("arn:aws:kinesis:us-east-1:012345678901:stream/test-stream-name"),
StreamName: aws.String("test-stream-name-2"),
}
defer func() {
r := recover()
if r != `kinesis: StreamName must match the StreamArn current StreamARN: arn:aws:kinesis:us-east-1:012345678901:stream/test-stream-name current StreamName: test-stream-name-2` {
t.Errorf("unexpected panic: %v", r)
} else {
t.Logf("TestConfig_NoMatchStreamArnAndName success")
}
}()
c.defaults()
}
3 changes: 2 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
)
Expand All @@ -15,7 +16,7 @@ func ExampleSimple() {
cfg, _ := config.LoadDefaultConfig(context.TODO())
client := kinesis.NewFromConfig(cfg)
pr := New(&Config{
StreamName: "test",
StreamName: aws.String("test"),
BacklogCount: 2000,
Client: client,
Logger: logger,
Expand Down
4 changes: 2 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
k "github.com/aws/aws-sdk-go-v2/service/kinesis"
ktypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -247,7 +246,8 @@ func (p *Producer) flush(records []ktypes.PutRecordsRequestEntry, reason string)
for {
p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)})
out, err := p.Client.PutRecords(context.Background(), &k.PutRecordsInput{
StreamName: aws.String(p.StreamName),
StreamARN: p.StreamARN,
StreamName: p.StreamName,
Records: records,
})

Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ var testCases = []testCase{

func TestProducer(t *testing.T) {
for _, test := range testCases {
test.config.StreamName = test.name
test.config.StreamName = aws.String("foo")
test.config.MaxConnections = 1
test.config.Client = test.putter
p := New(test.config)
Expand All @@ -181,7 +181,7 @@ func TestProducer(t *testing.T) {
func TestNotify(t *testing.T) {
kError := errors.New("ResourceNotFoundException: Stream foo under account X not found")
p := New(&Config{
StreamName: "foo",
StreamName: aws.String("foo"),
MaxConnections: 1,
BatchCount: 1,
AggregateBatchCount: 10,
Expand Down