Skip to content

Commit

Permalink
AsyncMessageSink franz-go implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuliarca committed Dec 20, 2023
1 parent cfa606e commit 6cccc9b
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions kafka/franz/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
"github.com/uw-labs/substrate/kafka"
)

type synchronousSink struct {
type asyncMessageSink struct {
client *fkafka.Client
admClient *kadm.Client

cfg kafka.AsyncMessageSinkConfig
}

func NewSynchronousMessageSink(cfg kafka.AsyncMessageSinkConfig) (substrate.SynchronousMessageSink, error) {
func NewAsyncMessageSink(cfg kafka.AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.Brokers...),
kgo.DefaultProduceTopic(cfg.Topic),
Expand All @@ -45,15 +45,36 @@ func NewSynchronousMessageSink(cfg kafka.AsyncMessageSinkConfig) (substrate.Sync
if err != nil {
return nil, fmt.Errorf("failed creating franz-go client: %w", err)
}
return &synchronousSink{client: c, admClient: kadm.NewClient(c.Client), cfg: cfg}, nil
return &asyncMessageSink{client: c, admClient: kadm.NewClient(c.Client), cfg: cfg}, nil
}

func (s *synchronousSink) Close() error {
func (s *asyncMessageSink) PublishMessages(ctx context.Context, acks chan<- substrate.Message, messages <-chan substrate.Message) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-messages:
/* not effective, as this is actually synchronous. If you need real async sink you'll need another implementation */
pr := s.client.ProduceSync(ctx, s.toRecord(msg))
if pr.FirstErr() != nil {
return pr.FirstErr()
}

select {
case acks <- msg:
case <-ctx.Done():
return ctx.Err()
}
}
}
}

func (s *asyncMessageSink) Close() error {
s.client.Close()
return nil
}

func (s *synchronousSink) PublishMessage(ctx context.Context, message substrate.Message) error {
func (s *asyncMessageSink) toRecord(message substrate.Message) *kgo.Record {
r := &kgo.Record{
Value: message.Data(),
}
Expand All @@ -66,15 +87,10 @@ func (s *synchronousSink) PublishMessage(ctx context.Context, message substrate.
r.Key = s.cfg.KeyFunc(message)
}
}

res := s.client.ProduceSync(ctx, r)
if err := res.FirstErr(); err != nil {
return fmt.Errorf("failed producing message: %w", err)
}
return nil
return r
}

func (s *synchronousSink) Status() (*substrate.Status, error) {
func (s *asyncMessageSink) Status() (*substrate.Status, error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 20*time.Second)
defer cancelFunc()

Expand Down

0 comments on commit 6cccc9b

Please sign in to comment.