Skip to content

Commit

Permalink
added aws_client_name to all metadata aware inputs and outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
applike-ss committed Jul 24, 2023
1 parent f5ae06e commit f0354e9
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/cloud/aws/kinesis/kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
)

type KinsumerMetadata struct {
AwsClientName string `json:"aws_client_name"`
ClientId ClientId `json:"client_id"`
Name string `json:"name"`
StreamAppId cfg.AppId `json:"stream_app_id"`
Expand Down Expand Up @@ -146,6 +147,7 @@ func NewKinsumer(ctx context.Context, config cfg.Config, logger log.Logger, sett
}

kinsumerMetadata := KinsumerMetadata{
AwsClientName: settings.ClientName,
ClientId: clientId,
Name: settings.Name,
StreamAppId: settings.AppId,
Expand Down
6 changes: 4 additions & 2 deletions pkg/cloud/aws/kinesis/record_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Record struct {
}

type RecordWriterMetadata struct {
StreamName string `json:"stream_name"`
AwsClientName string `json:"aws_client_name"`
StreamName string `json:"stream_name"`
}

type RecordWriterSettings struct {
Expand Down Expand Up @@ -75,7 +76,8 @@ func NewRecordWriter(ctx context.Context, config cfg.Config, logger log.Logger,
}

metadata := RecordWriterMetadata{
StreamName: settings.StreamName,
AwsClientName: settings.ClientName,
StreamName: settings.StreamName,
}
if err = appctx.MetadataAppend(ctx, metadataKeyRecordWriters, metadata); err != nil {
return nil, fmt.Errorf("can not access the appctx metadata: %w", err)
Expand Down
18 changes: 17 additions & 1 deletion pkg/cloud/aws/sqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ type Queue interface {
SendBatch(ctx context.Context, messages []*Message) error
}

type QueueMetadata struct {
AwsClientName string `json:"aws_client_name"`
QueueArn string `json:"queue_arn"`
QueueName string `json:"queue_name"`
QueueNameFull string `json:"queue_name_full"`
QueueUrl string `json:"queue_url"`
}

type Message struct {
DelaySeconds int32
MessageGroupId *string
Expand Down Expand Up @@ -102,7 +110,15 @@ func NewQueue(ctx context.Context, config cfg.Config, logger log.Logger, setting
return nil, fmt.Errorf("could not create or get properties of queue %s: %w", settings.QueueName, err)
}

if err = appctx.MetadataAppend(ctx, MetadataKeyQueues, settings.QueueName); err != nil {
metadata := QueueMetadata{
AwsClientName: settings.ClientName,
QueueArn: props.Arn,
QueueName: settings.QueueName,
QueueNameFull: props.Name,
QueueUrl: props.Url,
}

if err = appctx.MetadataAppend(ctx, MetadataKeyQueues, metadata); err != nil {
return nil, fmt.Errorf("can not access the appctx metadata: %w", err)
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/ddb/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Repository interface {
UpdateItemBuilder() UpdateItemBuilder
}

type TableMetadata struct {
AwsClientName string `json:"aws_client_name"`
TableName string `json:"table_name"`
}

type repository struct {
logger log.Logger
tracer tracing.Tracer
Expand Down Expand Up @@ -109,7 +114,12 @@ func NewRepository(ctx context.Context, config cfg.Config, logger log.Logger, se
}
}

if err = appctx.MetadataAppend(ctx, MetadataKeyTables, metadataFactory.GetTableName()); err != nil {
metadata := TableMetadata{
AwsClientName: settings.ClientName,
TableName: metadataFactory.GetTableName(),
}

if err = appctx.MetadataAppend(ctx, MetadataKeyTables, metadata); err != nil {
return nil, fmt.Errorf("can not access the appctx metadata: %w", err)
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/stream/consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ const (
)

type ConsumerMetadata struct {
Name string `json:"name"`
RetryEnabled bool `json:"retry_enabled"`
RetryType string `json:"retry_type"`
RunnerCount int `json:"runner_count"`
AwsClientName string `json:"aws_client_name,omitempty"`
Name string `json:"name"`
RetryEnabled bool `json:"retry_enabled"`
RetryType string `json:"retry_type"`
RunnerCount int `json:"runner_count"`
}

type ClientNameAware interface {
GetClientName() string
}

//go:generate mockery --name RunnableCallback
Expand Down Expand Up @@ -130,6 +135,11 @@ func NewBaseConsumer(ctx context.Context, config cfg.Config, logger log.Logger,
RetryType: settings.Retry.Type,
RunnerCount: settings.RunnerCount,
}

if v, ok := input.(ClientNameAware); ok {
consumerMetadata.AwsClientName = v.GetClientName()
}

if err = appctx.MetadataAppend(ctx, metadataKeyConsumers, consumerMetadata); err != nil {
return nil, fmt.Errorf("can not access the appctx metadata: %w", err)
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/stream/input_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

type kinesisInput struct {
client kinesis.Kinsumer
channel chan *Message
clientName string
client kinesis.Kinsumer
channel chan *Message
}

func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger, settings kinesis.Settings) (Input, error) {
Expand All @@ -22,8 +23,9 @@ func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger,
}

return &kinesisInput{
client: client,
channel: make(chan *Message),
clientName: settings.ClientName,
client: client,
channel: make(chan *Message),
}, nil
}

Expand All @@ -39,6 +41,10 @@ func (i *kinesisInput) Data() <-chan *Message {
return i.channel
}

func (i *kinesisInput) GetClientName() string {
return i.clientName
}

type kinesisMessageHandler struct {
channel chan *Message
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
const MetadataKeyProducers = "stream.producers"

type ProducerMetadata struct {
AwsClientName string `json:"aws_client_name,omitempty"`
Name string `json:"name"`
DaemonEnabled bool `json:"daemon_enabled"`
}
Expand Down Expand Up @@ -67,6 +68,11 @@ func NewProducer(ctx context.Context, config cfg.Config, logger log.Logger, name
Name: name,
DaemonEnabled: settings.Daemon.Enabled,
}

if v, ok := output.(ClientNameAware); ok {
metadata.AwsClientName = v.GetClientName()
}

if err = appctx.MetadataAppend(ctx, MetadataKeyProducers, metadata); err != nil {
return nil, fmt.Errorf("can not access the appctx metadata: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/stream/producer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ func NewProducerDaemonWithInterfaces(logger log.Logger, metric metric.Writer, ag
}
}

func (d *producerDaemon) GetClientName() string {
clientName := ""

if v, ok := d.output.(ClientNameAware); ok {
clientName = v.GetClientName()
}

return clientName
}

func (d *producerDaemon) GetStage() int {
return 512
}
Expand Down

0 comments on commit f0354e9

Please sign in to comment.