Skip to content

Commit 213048f

Browse files
authored
- Upgrades to collector 0.128 (#10)
- Adds ability to shard the main consuming goroutines. Alleviates a major CPU bottleneck. See DESIGN.md for more explanation. - Adds ability to configure size of the buffered chan waiting for consumption - Optimises metric collection on hot path. Use async instrument for cache metrics. - Move various operations where possible, out of the hot path.
1 parent caf1890 commit 213048f

File tree

20 files changed

+1264
-1052
lines changed

20 files changed

+1264
-1052
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.idea/
22
/tools/
3-
test-reports/
3+
test-reports/
4+
**/coverage.out

internal/ptraceutil/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ module github.com/atlassian-labs/atlassian-sampling-processor/internal/ptraceuti
33
go 1.24.0
44

55
require (
6-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.126.0
6+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.128.0
77
github.com/stretchr/testify v1.10.0
8-
go.opentelemetry.io/collector/pdata v1.32.0
8+
go.opentelemetry.io/collector/pdata v1.34.0
99
)
1010

1111
require (
@@ -15,14 +15,14 @@ require (
1515
github.com/json-iterator/go v1.1.12 // indirect
1616
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
1717
github.com/modern-go/reflect2 v1.0.2 // indirect
18-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.126.0 // indirect
18+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.128.0 // indirect
1919
github.com/pmezard/go-difflib v1.0.0 // indirect
2020
go.uber.org/multierr v1.11.0 // indirect
2121
golang.org/x/net v0.39.0 // indirect
2222
golang.org/x/sys v0.32.0 // indirect
2323
golang.org/x/text v0.24.0 // indirect
2424
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
25-
google.golang.org/grpc v1.72.0 // indirect
25+
google.golang.org/grpc v1.72.2 // indirect
2626
google.golang.org/protobuf v1.36.6 // indirect
2727
gopkg.in/yaml.v3 v3.0.1 // indirect
2828
)

internal/ptraceutil/go.sum

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
2929
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
3030
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
3131
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
32-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0 h1:AnOgi0AF5kALP4hEILsQEnRzT/yNXfua598210Dn9ko=
33-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.126.0/go.mod h1:jjyo4lLRH9WOUJ9djpEql6xqVAaReNDY7ciWRt23FZk=
34-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.126.0 h1:9RPktK9IsZaHN5aGV+bA7UbGtZCDGWvkSLcldAIPD98=
35-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.126.0/go.mod h1:ZgQQqwY9c/e3JleZPQ1xxm9ZbgEKpGVjBEP+D+fTM+s=
36-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.126.0 h1:FqfYYIBllbKMX2J64U37bVpICpo3+chXC3oC192fffM=
37-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.126.0/go.mod h1:j54xa94UWeLUNV1PXLm8QAlXCOqw6T8LOACb/qtZcug=
32+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.128.0 h1:GJzARUS5NcCeYr7pwlrYMEK+fl92cmCDED2to7nPuCQ=
33+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.128.0/go.mod h1:YrULw8EK8Vj0LX2ZhtfqMaIlLATIGOlbII9RDR8lPeI=
34+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.128.0 h1:+rUULr4xqOJjZK3SokFmRYzsiPq5onoWoSv3He4aaus=
35+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.128.0/go.mod h1:Fh2SXPeFkr4J97w9CV/apFAib8TC9Hi0P08xtiT7Lng=
36+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.128.0 h1:8OWwRSdIhm3DY3PEYJ0PtSEz1a1OjL0fghLXSr14JMk=
37+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.128.0/go.mod h1:32OeaysZe4vkSmD1LJ18Q1DfooryYqpSzFNmz+5A5RU=
3838
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3939
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
4040
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
@@ -47,10 +47,10 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
4747
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
4848
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
4949
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
50-
go.opentelemetry.io/collector/pdata v1.32.0 h1:hBzlJV1rujr1UdD2CBy2gmaIKtC15ysg/z+x8F3McQA=
51-
go.opentelemetry.io/collector/pdata v1.32.0/go.mod h1:m41io9nWpy7aCm/uD1L9QcKiZwOP0ldj83JEA34dmlk=
52-
go.opentelemetry.io/collector/pdata/pprofile v0.126.0 h1:ArYQxg5KdTb98r1X6KSZY7W6/4DPv/q6z7jSbSZ1mBc=
53-
go.opentelemetry.io/collector/pdata/pprofile v0.126.0/go.mod h1:2fBTFDcXjVfseBQKnt/DTM0EYTmFoPKtRpjg8ql38Ek=
50+
go.opentelemetry.io/collector/pdata v1.34.0 h1:2vwYftckXe7pWxI9mfSo+tw3wqdGNrYpMbDx/5q6rw8=
51+
go.opentelemetry.io/collector/pdata v1.34.0/go.mod h1:StPHMFkhLBellRWrULq0DNjv4znCDJZP6La4UuC+JHI=
52+
go.opentelemetry.io/collector/pdata/pprofile v0.128.0 h1:6DEtzs/liqv/ukz2EHbC5OMaj2V6K2pzuj/LaRg2YmY=
53+
go.opentelemetry.io/collector/pdata/pprofile v0.128.0/go.mod h1:bVVRpz+zKFf1UCCRUFqy8LvnO3tHlXKkdqW2d+Wi/iA=
5454
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
5555
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
5656
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
@@ -98,8 +98,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
9898
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
9999
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
100100
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
101-
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
102-
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
101+
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
102+
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
103103
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
104104
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
105105
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

pkg/processor/atlassiansamplingprocessor/DESIGN.md

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ It also contains information about how to run in a production environment.
88

99
This section describes, in order, the path a trace takes when consumed by this processor.
1010

11-
1. `ConsumeTraces()` is invoked. This blocks on send to an unbuffered `chan`, and then returns.
12-
2. `consumeChan()` reads the `chan` and processes the traces.
11+
1. `ConsumeTraces()` is invoked. This organises the data by trace ID and shard, does an early decision check, sends to the shard `chan`, and then returns.
12+
2. `shardListener()` reads its assigned `chan` and processes the traces.
1313
3. The data is organised by trace ID, and the main loop in `processor.go` processes the data one trace ID at a time.
1414
4. The decision caches are accessed to determine if a sampling decision has already been made for the current trace ID.
1515
If a prior decision exists, this allows us to streamline the processing. When the cache indicates that the trace has
@@ -25,25 +25,21 @@ Least Recently Used (LRU) basis, meaning that adding new data to the cache may i
2525
least recently accessed trace (i.e. the trace that last received a new span the longest time ago). When a trace is
2626
evicted, it is considered "not sampled" and added to the decision cache.
2727

28-
## Synchronized Goroutine
28+
## Synchronization / Sharding
2929

30-
The main operation of the processor is executed as a single goroutine, synchronized through an unbuffered channel.
30+
The main processing of this component is done by async goroutines (shard listeners) which read off "shards" (channels).
31+
Trace data is sharded in `ConsumeTraces()` before being sent to the appropriate shard for processing.
3132

3233
In the collector architecture, receivers typically function as servers that accept and process data using multiple goroutines.
3334
Consequently, processors like this one are invoked concurrently through the `ConsumeTraces()` method.
34-
To ensure synchronization, the processor sends data to a channel, which is then received by a
35-
dedicated goroutine (`consumeChan()`). This design guarantees that all data is processed by a single goroutine.
36-
It draws inspiration from the core collector's batch processor.
37-
38-
The decision to synchronize is primarily driven by the need to maintain the integrity of internal
39-
caches while keeping the design simple. Allowing concurrent access to cached trace data would complicate the
40-
code significantly and potentially lead to bugs, as experienced in the upstream tail sampling processor.
41-
42-
This is, of course, a trade-off. The processing throughput is limited by the capacity of a single goroutine,
43-
creating a potential bottleneck. This can be alleviated by deploying more instances of the processor with reduced
44-
memory allocation per instance (e.g., more nodes, each with less memory). If the bottleneck becomes a significant issue,
45-
a future enhancement could involve sharding the processor. This would involve splitting the processing workload by trace
46-
ID and maintaining separate caches and states for each shard.
35+
To ensure synchronization, the processor sends data to channels, which is then received by a
36+
dedicated shard listener. Spans belonging to the same trace will all be sent to the same shard, and that shard will
37+
be processed entirely synchronously by the same shard listener - this ensures data integrity because writes are limited
38+
to these shard listeners. Each shard listener can be thought of "owning" a section of the caches.
39+
40+
All shard listeners still access the same caches as each other, so a global lock is employed for any operations that may affect data cross-shard.
41+
The prime example of an operation like this, is the resizing of the caches, which can be performed by any shard listener, but may delete data
42+
belonging to a different shard listener. So, a stop-the-world kind of halt occurs briefly while the cache gets resized.
4743

4844
## Policies, and Policy Evaluation
4945

pkg/processor/atlassiansamplingprocessor/config.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
)
1010

1111
type Config struct {
12+
// Shards controls how many goroutines consume the data incoming to the processor, sharded by trace ID. Default = 1.
13+
Shards int `mapstructure:"shards"`
14+
1215
// PolicyConfig sets the tail-based sampling policy which makes a sampling decision
1316
// for a given trace when requested.
1417
PolicyConfig []PolicyConfig `mapstructure:"policies"`
@@ -37,18 +40,25 @@ type Config struct {
3740

3841
// CompressionEnabled compresses trace data in the primary and secondary caches if enabled
3942
CompressionEnabled bool `mapstructure:"compression_enabled"`
43+
44+
// PreprocessBufferSize specifies the size of the chan that queues incoming trace data to be processed by the main loop.
45+
// Default is 0, in which case an unbuffered channel is used.
46+
PreprocessBufferSize int `mapstructure:"preprocess_buffer_size"`
4047
}
4148

4249
var (
4350
primaryCacheSizeError = errors.New("primary_cache_size must be greater than 0")
4451
secondaryCacheSizeError = errors.New("secondary_cache_size must be greater than 0 and less than 50% of primary_cache_size")
4552
duplicatePolicyName = errors.New("duplicate policy names found in sampling policy config")
53+
invalidBufferSize = errors.New("preprocess_buffer_size must be >= 0")
54+
invalidShardCount = errors.New("shards must be > 0")
4655
)
4756

4857
var _ component.Config = (*Config)(nil)
4958

5059
func createDefaultConfig() component.Config {
5160
return &Config{
61+
Shards: 1,
5262
PrimaryCacheSize: 1000,
5363
SecondaryCacheSize: 100,
5464
DecisionCacheCfg: DecisionCacheCfg{
@@ -81,6 +91,14 @@ func (cfg *Config) Validate() (errors error) {
8191
errors = multierr.Append(errors, secondaryCacheSizeError)
8292
}
8393

94+
if cfg.PreprocessBufferSize < 0 {
95+
errors = multierr.Append(errors, invalidBufferSize)
96+
}
97+
98+
if cfg.Shards <= 0 {
99+
errors = multierr.Append(errors, invalidShardCount)
100+
}
101+
84102
err := validateUniquePolicyNames(cfg.PolicyConfig)
85103
if err != nil {
86104
errors = multierr.Append(errors, err)

pkg/processor/atlassiansamplingprocessor/config_test.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ func TestLoadConfig(t *testing.T) {
4040
assert.Equal(t,
4141
cfg,
4242
&Config{
43-
PrimaryCacheSize: 1000,
44-
SecondaryCacheSize: 100,
45-
TargetHeapBytes: 100_000_000,
46-
RegulateCacheDelay: delay,
47-
DecisionCacheCfg: DecisionCacheCfg{SampledCacheSize: 1000, NonSampledCacheSize: 10000},
48-
CompressionEnabled: true,
43+
PrimaryCacheSize: 1000,
44+
SecondaryCacheSize: 100,
45+
TargetHeapBytes: 100_000_000,
46+
RegulateCacheDelay: delay,
47+
DecisionCacheCfg: DecisionCacheCfg{SampledCacheSize: 1000, NonSampledCacheSize: 10000},
48+
CompressionEnabled: true,
49+
PreprocessBufferSize: 10,
50+
Shards: 5,
4951
PolicyConfig: []PolicyConfig{
5052
{
5153
SharedPolicyConfig: SharedPolicyConfig{
@@ -185,6 +187,7 @@ func TestValidate(t *testing.T) {
185187
c: &Config{
186188
PrimaryCacheSize: 100,
187189
SecondaryCacheSize: 10,
190+
Shards: 1,
188191
PolicyConfig: make([]PolicyConfig, 0),
189192
},
190193
expectedError: nil,
@@ -194,6 +197,7 @@ func TestValidate(t *testing.T) {
194197
c: &Config{
195198
PrimaryCacheSize: 0,
196199
SecondaryCacheSize: 10,
200+
Shards: 1,
197201
PolicyConfig: make([]PolicyConfig, 0),
198202
},
199203
expectedError: primaryCacheSizeError,
@@ -203,6 +207,7 @@ func TestValidate(t *testing.T) {
203207
c: &Config{
204208
PrimaryCacheSize: 10,
205209
SecondaryCacheSize: 0,
210+
Shards: 1,
206211
PolicyConfig: make([]PolicyConfig, 0),
207212
},
208213
expectedError: secondaryCacheSizeError,
@@ -212,6 +217,7 @@ func TestValidate(t *testing.T) {
212217
c: &Config{
213218
PrimaryCacheSize: 100,
214219
SecondaryCacheSize: 50,
220+
Shards: 1,
215221
PolicyConfig: make([]PolicyConfig, 0),
216222
},
217223
expectedError: nil,
@@ -221,13 +227,36 @@ func TestValidate(t *testing.T) {
221227
c: &Config{
222228
PrimaryCacheSize: 100,
223229
SecondaryCacheSize: 55,
230+
Shards: 1,
224231
PolicyConfig: make([]PolicyConfig, 0),
225232
},
226233
expectedError: secondaryCacheSizeError,
227234
},
235+
{
236+
name: "Invalid buffer size",
237+
c: &Config{
238+
PrimaryCacheSize: 100,
239+
SecondaryCacheSize: 10,
240+
Shards: 1,
241+
PolicyConfig: make([]PolicyConfig, 0),
242+
PreprocessBufferSize: -1,
243+
},
244+
expectedError: invalidBufferSize,
245+
},
246+
{
247+
name: "Invalid shard number",
248+
c: &Config{
249+
PrimaryCacheSize: 100,
250+
SecondaryCacheSize: 10,
251+
PolicyConfig: make([]PolicyConfig, 0),
252+
Shards: -1,
253+
},
254+
expectedError: invalidShardCount,
255+
},
228256
{
229257
name: "No duplicate policy names",
230258
c: &Config{
259+
Shards: 1,
231260
PrimaryCacheSize: 100,
232261
SecondaryCacheSize: 10,
233262
PolicyConfig: []PolicyConfig{
@@ -259,6 +288,7 @@ func TestValidate(t *testing.T) {
259288
c: &Config{
260289
PrimaryCacheSize: 100,
261290
SecondaryCacheSize: 10,
291+
Shards: 1,
262292
PolicyConfig: []PolicyConfig{
263293
{
264294
SharedPolicyConfig: SharedPolicyConfig{

pkg/processor/atlassiansamplingprocessor/decider.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ func (d *decider) MakeDecision(ctx context.Context, id pcommon.TraceID, currentT
4545
if err != nil {
4646
d.log.Warn("policy evaluation errored", zap.Error(err), zap.String("policy.name", p.name))
4747
}
48-
d.telemetry.ProcessorAtlassianSamplingPolicyDecisions.Add(ctx, 1, metric.WithAttributes(
48+
d.telemetry.ProcessorAtlassianSamplingPolicyDecisions.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
4949
attribute.String("policy", p.name),
50-
attribute.String("decision", decision.String()),
51-
))
50+
attribute.String("decision", decision.String()))))
5251

5352
// Assume we have policy list [X, Y, Z],
5453
// 1. Trace A/Span A is marked as low priority by a policy Z, we will set LastLowPriorityDecisionName to Z.

pkg/processor/atlassiansamplingprocessor/documentation.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,6 @@ Number of spans that have been dropped due to an internal error
3838
| ---- | ----------- | ---------- | --------- |
3939
| {spans} | Sum | Int | true |
4040

41-
### otelcol_processor_atlassian_sampling_overly_eager_lonely_root_span_decisions
42-
43-
Number of spans that have been aggressively sampled out by root span policy
44-
45-
| Unit | Metric Type | Value Type | Monotonic |
46-
| ---- | ----------- | ---------- | --------- |
47-
| {spans} | Sum | Int | true |
48-
4941
### otelcol_processor_atlassian_sampling_policy_decisions
5042

5143
Sampling decisions made specifying policy and decision.

0 commit comments

Comments
 (0)