Skip to content

Commit c71a416

Browse files
authored
Merge pull request #240 from coroot/ebpf_traces_improvements
eBPF traces improvements
2 parents 73630f0 + c7d5d32 commit c71a416

File tree

7 files changed

+52
-13
lines changed

7 files changed

+52
-13
lines changed

containers/container.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,17 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
716716
}
717717
stats := c.l7Stats.get(r.Protocol, conn.DestinationKey)
718718

719-
trace := c.tracer.NewTrace(conn.DestinationKey.ActualDestinationIfKnown())
719+
ebpfTracesDisabled := false
720+
for _, p := range c.processes {
721+
if p.Flags.EbpfTracesDisabled {
722+
ebpfTracesDisabled = true
723+
break
724+
}
725+
}
726+
var trace *tracing.Trace
727+
if !ebpfTracesDisabled {
728+
trace = c.tracer.NewTrace(conn.DestinationKey.ActualDestinationIfKnown())
729+
}
720730
switch r.Protocol {
721731
case l7.ProtocolHTTP:
722732
method, path := l7.ParseHttp(r.Payload)
@@ -984,13 +994,10 @@ func (c *Container) runLogParser(logPath string) {
984994
return
985995
}
986996

987-
for pid := range c.processes {
988-
if processFlags, err := proc.GetFlags(pid); err == nil {
989-
if processFlags.LogMonitoringDisabled {
990-
klog.InfoS("skipping log monitoring due to COROOT_LOG_MONITORING=disabled", "cg", c.cgroup.Id)
991-
return
992-
}
993-
break
997+
for _, p := range c.processes {
998+
if p.Flags.LogMonitoringDisabled {
999+
klog.InfoS("skipping log monitoring due to COROOT_LOG_MONITORING=disabled", "cg", c.cgroup.Id)
1000+
return
9941001
}
9951002
}
9961003

containers/process.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type Process struct {
2828
Pid uint32
2929
StartedAt time.Time
3030

31+
Flags proc.Flags
32+
3133
netNsId string
3234

3335
ctx context.Context
@@ -46,6 +48,7 @@ type Process struct {
4648

4749
func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {
4850
p := &Process{Pid: pid, StartedAt: stats.BeginTime}
51+
p.Flags, _ = proc.GetFlags(pid)
4952
p.ctx, p.cancelFunc = context.WithCancel(context.Background())
5053
go p.instrument(tracer)
5154
return p

containers/registry.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type ProcessInfo struct {
4040
Pid uint32
4141
ContainerId ContainerID
4242
StartedAt time.Time
43+
Flags proc.Flags
4344
}
4445

4546
type Registry struct {
@@ -236,7 +237,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
236237
if c := r.getOrCreateContainer(e.Pid); c != nil {
237238
p := c.onProcessStart(e.Pid)
238239
if r.processInfoCh != nil && p != nil {
239-
r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt}
240+
r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt, Flags: p.Flags}
240241
}
241242
}
242243
case ebpftracer.EventTypeProcessExit:

flags/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
ApiKey = kingpin.Flag("api-key", "Coroot API key").Envar("API_KEY").String()
4242
MetricsEndpoint = kingpin.Flag("metrics-endpoint", "The URL of the endpoint to send metrics to").Envar("METRICS_ENDPOINT").URL()
4343
TracesEndpoint = kingpin.Flag("traces-endpoint", "The URL of the endpoint to send traces to").Envar("TRACES_ENDPOINT").URL()
44+
TracesSampling = kingpin.Flag("traces-sampling", "Trace sampling rate (0.0 to 1.0)").Default("1.0").Envar("TRACES_SAMPLING").Float64()
4445
LogsEndpoint = kingpin.Flag("logs-endpoint", "The URL of the endpoint to send logs to").Envar("LOGS_ENDPOINT").URL()
4546
ProfilesEndpoint = kingpin.Flag("profiles-endpoint", "The URL of the endpoint to send profiles to").Envar("PROFILES_ENDPOINT").URL()
4647
InsecureSkipVerify = kingpin.Flag("insecure-skip-verify", "whether to skip verifying the certificate or not").Envar("INSECURE_SKIP_VERIFY").Default("false").Bool()

proc/flags.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
type Flags struct {
1010
EbpfProfilingDisabled bool
11+
EbpfTracesDisabled bool
1112
LogMonitoringDisabled bool
1213
}
1314

@@ -36,6 +37,8 @@ func GetFlags(pid uint32) (Flags, error) {
3637
flags.EbpfProfilingDisabled = strings.Contains(kv[1], "disabled")
3738
case "COROOT_LOG_MONITORING":
3839
flags.LogMonitoringDisabled = strings.Contains(kv[1], "disabled")
40+
case "COROOT_EBPF_TRACES":
41+
flags.EbpfTracesDisabled = strings.Contains(kv[1], "disabled")
3942
}
4043
}
4144
return flags, nil

profiling/profiling.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
215215
target: sd.NewTargetForTesting(cid, 0, sd.DiscoveryTarget{
216216
"service_name": common.ContainerIdToOtelServiceName(cid),
217217
}),
218+
flags: pi.Flags,
218219
}
219220
tf.lock.Unlock()
220221
}
@@ -234,10 +235,6 @@ func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
234235
var err error
235236
if !pi.initialized {
236237
pi.initialized = true
237-
if pi.flags, err = proc.GetFlags(pid); err != nil {
238-
delete(tf.processes, pid)
239-
return nil
240-
}
241238
if !pi.flags.EbpfProfilingDisabled {
242239
cmdline := proc.GetCmdline(pid)
243240
if proc.IsJvm(cmdline) {

tracing/tracing.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7+
"math/rand"
78
"time"
89

910
"github.com/coroot/coroot-node-agent/common"
@@ -29,6 +30,7 @@ var (
2930
commonResourceAttrs []attribute.KeyValue
3031
agentVersion string
3132
initialized bool
33+
samplingRate float64
3234
)
3335

3436
func Init(machineId, hostname, version string) {
@@ -37,6 +39,15 @@ func Init(machineId, hostname, version string) {
3739
klog.Infoln("no OpenTelemetry traces collector endpoint configured")
3840
return
3941
}
42+
43+
samplingRate = *flags.TracesSampling
44+
if samplingRate < 0.0 || samplingRate > 1.0 {
45+
klog.Warningf("invalid traces-sampling value %f, must be between 0.0 and 1.0, using default 1.0", samplingRate)
46+
samplingRate = 1.0
47+
}
48+
if samplingRate < 1.0 {
49+
klog.Infof("trace sampling rate set to %f", samplingRate)
50+
}
4051
klog.Infoln("OpenTelemetry traces collector endpoint:", endpointUrl.String())
4152
path := endpointUrl.Path
4253
if path == "" {
@@ -67,6 +78,17 @@ type Tracer struct {
6778
otel trace.Tracer
6879
}
6980

81+
func shouldSample() bool {
82+
if samplingRate >= 1.0 {
83+
return true
84+
}
85+
if samplingRate <= 0.0 {
86+
return false
87+
}
88+
89+
return rand.Float64() < samplingRate
90+
}
91+
7092
func GetContainerTracer(containerId string) *Tracer {
7193
if !initialized {
7294
return &Tracer{otel: nil}
@@ -103,6 +125,11 @@ func (t *Trace) createSpan(name string, duration time.Duration, error bool, attr
103125
return
104126
}
105127
end := time.Now()
128+
129+
if !shouldSample() {
130+
return
131+
}
132+
106133
start := end.Add(-duration)
107134
_, span := t.tracer.otel.Start(nil, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
108135
span.SetAttributes(attrs...)

0 commit comments

Comments
 (0)