Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
273f246
init carrier.go from local example
igoragoli Nov 27, 2025
b3127d7
copy types.go from segmentio/kafka-go
igoragoli Nov 27, 2025
29f807b
rename Message to Record
igoragoli Nov 27, 2025
315281a
add a TODO for renaming the carrier
igoragoli Nov 27, 2025
ee48fa0
don't use kgo, use the structs/interfaces on types.go
igoragoli Nov 27, 2025
a7d2bcf
simplify comments
igoragoli Nov 27, 2025
013efa8
fix Set()
igoragoli Nov 27, 2025
74d3d62
copy tracer.go from segmentio/kafka-go
igoragoli Nov 27, 2025
7f6ee8a
add PackageTwmbFranzGo to packages.go
igoragoli Nov 27, 2025
9ea6167
point to the PackageTwmbFranzGo instrumentation
igoragoli Nov 27, 2025
f454a87
make comments on tracer.go standardized
igoragoli Nov 27, 2025
d239e49
copy tracing.go from segmentio/kafka-go
igoragoli Nov 27, 2025
c088362
change header and componentName
igoragoli Nov 27, 2025
5947936
update StartConsumeSpan
igoragoli Nov 27, 2025
289b46a
update StartProduceSpan
igoragoli Nov 27, 2025
969c64e
exporting KafkaHeadersCarrier
igoragoli Nov 27, 2025
44a3847
tracer.go is now options.go
igoragoli Nov 27, 2025
94691a7
forgot the constructor
igoragoli Nov 27, 2025
0eeaf01
remove question
igoragoli Nov 27, 2025
3af5ce3
copy dsm.go from segmentio/kafka-go and edit it
igoragoli Nov 27, 2025
fd55d0e
add tracing_test.go
igoragoli Nov 27, 2025
9c7986c
add tracing.go based on the one on segmentio/kafka-go
igoragoli Nov 28, 2025
969be60
small fixes on tracing.go
igoragoli Nov 28, 2025
f9aef89
fix internal/tracing path on tracing.go
igoragoli Dec 11, 2025
107ba55
add franz-go go.mod
igoragoli Dec 11, 2025
c00ad14
init kgo.go with hooks for setting bootstrap servers
igoragoli Dec 11, 2025
6d674ae
add (Get|Set)KafkaConfig on tracing.go
igoragoli Dec 11, 2025
cb3d557
add TODO to make sure I edit writer later
igoragoli Dec 11, 2025
f946e6b
fix interface
igoragoli Dec 11, 2025
4f66789
update OnProduceRecordBuffered
igoragoli Dec 11, 2025
1e360ad
remove writer from StartProduceSpan interface
igoragoli Dec 11, 2025
5f5b13f
update OnProduceRecordUnbuffered
igoragoli Dec 11, 2025
f83e50a
AddBootstrapServer
igoragoli Dec 11, 2025
4507a6d
use AddBootstrapServer on kgo.go
igoragoli Dec 11, 2025
222f458
We don't need OnFetchRecordBuffered
igoragoli Dec 11, 2025
3f784d3
update OnFetchRecordUnbuffered
igoragoli Dec 11, 2025
fff3781
unnecessary imports
igoragoli Dec 11, 2025
7bd8411
set consumer group id
igoragoli Dec 11, 2025
376b59e
set consumer group id
igoragoli Dec 11, 2025
af6e45c
comment
igoragoli Dec 11, 2025
7ad81bd
fix
igoragoli Dec 17, 2025
3a06fae
logs that actually reflect what's being done
igoragoli Dec 17, 2025
b5a3d3a
dsm
igoragoli Dec 17, 2025
1d2f77e
wip
igoragoli Dec 18, 2025
b0f0a59
wip
igoragoli Dec 18, 2025
d027517
fix go.work and go.mod to enable running tests
igoragoli Dec 19, 2025
cf695f4
add testClient()
igoragoli Dec 19, 2025
5ecc9b9
wip
igoragoli Dec 19, 2025
aa3d155
wip
igoragoli Dec 19, 2025
f80a84a
a pointer!
igoragoli Dec 24, 2025
9cc43eb
test
igoragoli Dec 24, 2025
4be9636
test
igoragoli Dec 24, 2025
3e91b1b
remove dead code
igoragoli Dec 24, 2025
24ee1aa
context
igoragoli Dec 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions contrib/twmb/franz-go/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kgo_test

import (
"context"
"log"

kgotrace "github.com/DataDog/dd-trace-go/contrib/twmb/franz-go/v2"
"github.com/DataDog/dd-trace-go/contrib/twmb/franz-go/v2/internal/tracing"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"

"github.com/twmb/franz-go/pkg/kgo"
)

func Example() {
// Create a traced client with default configuration
client, err := kgotrace.NewClient(
kgotrace.ClientOptions(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("my-topic"),
),
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
defer client.Close()

// Produce a message - tracing is automatic
ctx := context.Background()
record := &kgo.Record{
Topic: "my-topic",
Value: []byte("Hello, Kafka!"),
}
if err := client.ProduceSync(ctx, record).FirstErr(); err != nil {
log.Fatal("Failed to produce:", err)
}

// Consume messages - tracing is automatic
fetches := client.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
log.Printf("Consumed: %s", string(r.Value))
})
}

func Example_withTracingOptions() {
// Create a traced client with custom tracing options
client, err := kgotrace.NewClient(
kgotrace.ClientOptions(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("my-topic"),
kgo.ConsumerGroup("my-consumer-group"),
),
tracing.WithService("my-service"),
tracing.WithAnalytics(true),
tracing.WithDataStreams(),
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
defer client.Close()

// Use the client normally - tracing options are applied automatically
ctx := context.Background()
record := &kgo.Record{
Topic: "my-topic",
Value: []byte("Hello, Kafka!"),
}
if err := client.ProduceSync(ctx, record).FirstErr(); err != nil {
log.Fatal("Failed to produce:", err)
}
}

func Example_manualChildSpan() {
// Create a traced client
client, err := kgotrace.NewClient(
kgotrace.ClientOptions(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumeTopics("my-topic"),
),
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
defer client.Close()

// Consume a message
ctx := context.Background()
fetches := client.PollFetches(ctx)

fetches.EachRecord(func(r *kgo.Record) {
// Extract the span context from the consumed message
spanContext, err := kgotrace.ExtractSpanContext(r)
if err != nil {
log.Fatal("Failed to extract span context:", err)
}

// Create a child span for processing the message
span := tracer.StartSpan("process-message", tracer.ChildOf(spanContext))
defer span.Finish()

// Process the message with the child span
log.Printf("Processing: %s", string(r.Value))
})
}
92 changes: 92 additions & 0 deletions contrib/twmb/franz-go/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
module github.com/DataDog/dd-trace-go/contrib/twmb/franz-go/v2

go 1.24.0

require (
github.com/DataDog/dd-trace-go/v2 v2.4.0-dev
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.20.5
github.com/twmb/franz-go/pkg/kadm v1.17.1
)

require (
github.com/DataDog/datadog-agent/comp/core/tagger/origindetection v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/opentelemetry-mapping-go/otlp/attributes v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/proto v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.73.0-rc.1 // indirect
github.com/DataDog/datadog-agent/pkg/trace v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/log v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/scrubber v0.71.0 // indirect
github.com/DataDog/datadog-agent/pkg/version v0.71.0 // indirect
github.com/DataDog/datadog-go/v5 v5.6.0 // indirect
github.com/DataDog/go-libddwaf/v4 v4.8.0 // indirect
github.com/DataDog/go-runtime-metrics-internal v0.0.4-0.20250721125240-fdf1ef85b633 // indirect
github.com/DataDog/go-sqllexer v0.1.8 // indirect
github.com/DataDog/go-tuf v1.1.1-0.5.2 // indirect
github.com/DataDog/sketches-go v1.4.7 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.8.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 // indirect
github.com/minio/simdjson-go v0.4.5 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.9.0 // indirect
github.com/shirou/gopsutil/v4 v4.25.8-0.20250809033336-ffcdc2b7662f // indirect
github.com/theckman/httpforwarded v0.4.0 // indirect
github.com/tinylib/msgp v1.3.0 // indirect
github.com/tklauser/go-sysconf v0.3.15 // indirect
github.com/tklauser/numcpus v0.10.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component v1.39.0 // indirect
go.opentelemetry.io/collector/featuregate v1.46.0 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.133.0 // indirect
go.opentelemetry.io/collector/pdata v1.46.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.140.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/log v0.13.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/DataDog/dd-trace-go/v2 => ../../..
Loading
Loading