Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions comp/forwarder/defaultforwarder/transaction/intake_offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package transaction

import (
"expvar"
"net/http"
"time"
)

var (
// intakeTimeOffsetExpvar stores the time offset between the agent and Datadog intake
// Captured from the Date header in successful HTTP responses
intakeTimeOffsetExpvar = expvar.NewFloat("corechecks_net_ntp_intake_time_offset")
)

// updateIntakeTimeOffset parses the Date header from an HTTP response and updates the intake time offset.
// The offset uses NTP convention: positive means agent is behind, negative means ahead.
func updateIntakeTimeOffset(dateHeader string) {
if dateHeader == "" {
return
}

intakeServerTime, err := http.ParseTime(dateHeader)
if err != nil {
return
}

// Calculate offset using NTP convention: positive means agent clock is behind, negative means ahead
// serverTime - agentTime: if result is positive, agent is behind (needs to add time)
offset := intakeServerTime.Sub(time.Now()).Seconds()
intakeTimeOffsetExpvar.Set(offset)
}
3 changes: 3 additions & 0 deletions comp/forwarder/defaultforwarder/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ func (t *HTTPTransaction) internalProcess(ctx context.Context, config config.Com
}
defer func() { _ = resp.Body.Close() }()

// Capture intake server time for clock offset monitoring
updateIntakeTimeOffset(resp.Header.Get("Date"))

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Errorf("Fail to read the response Body: %s", err)
Expand Down
21 changes: 20 additions & 1 deletion pkg/collector/corechecks/net/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,24 @@ func (c *NTPCheck) Run() error {
serviceCheckMessage := ""
offsetThreshold := c.cfg.instance.OffsetThreshold

// Submit intake offset first (captured from forwarder responses)
// This is independent of NTP check success
if intakeOffsetVar := expvar.Get("corechecks_net_ntp_intake_time_offset"); intakeOffsetVar != nil {
if floatVar, ok := intakeOffsetVar.(*expvar.Float); ok {
intakeOffset := floatVar.Value()
if !math.IsNaN(intakeOffset) {
// Calculate what the intake server's time would be by applying the offset to current time
// Using intake server's time as the metric timestamp ensures it appears correctly
// in Datadog even when the agent's clock is drifted
// (positive offset = agent behind, negative = agent ahead)
currentTime := time.Now()
intakeServerTime := currentTime.Add(time.Duration(intakeOffset * float64(time.Second)))
intakeTS := float64(intakeServerTime.UnixNano()) / 1e9
_ = sender.GaugeWithTimestamp("ntp.offset", intakeOffset, "", []string{"source:intake"}, intakeTS)
}
}
}

clockOffset, ts, err := c.queryOffset()
if err != nil {
log.Error(err)
Expand All @@ -230,9 +248,10 @@ func (c *NTPCheck) Run() error {
serviceCheckStatus = servicecheck.ServiceCheckOK
}

_ = sender.GaugeWithTimestamp("ntp.offset", clockOffset, "", nil, ts)
_ = sender.GaugeWithTimestamp("ntp.offset", clockOffset, "", []string{"source:ntp"}, ts)
ntpExpVar.Set(clockOffset)
tlmNtpOffset.Set(clockOffset)

sender.ServiceCheck("ntp.in_sync", serviceCheckStatus, "", nil, serviceCheckMessage)

c.lastCollection = time.Now()
Expand Down
55 changes: 49 additions & 6 deletions pkg/collector/corechecks/net/ntp/ntp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@ func TestNTPOK(t *testing.T) {
"ntp.offset",
float64(offset),
"",
[]string(nil),
[]string{"source:ntp"},
mock.AnythingOfType("float64"),
).Return().Times(1)
// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()
mockSender.On("ServiceCheck",
"ntp.in_sync",
servicecheck.ServiceCheckOK,
Expand Down Expand Up @@ -140,9 +147,16 @@ func TestNTPCritical(t *testing.T) {
"ntp.offset",
float64(offset),
"",
[]string(nil),
[]string{"source:ntp"},
mock.AnythingOfType("float64"),
).Return().Times(1)
// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()
mockSender.On("ServiceCheck",
"ntp.in_sync",
servicecheck.ServiceCheckCritical,
Expand Down Expand Up @@ -238,9 +252,16 @@ func TestNTPNegativeOffsetCritical(t *testing.T) {
"ntp.offset",
float64(offset),
"",
[]string(nil),
[]string{"source:ntp"},
mock.AnythingOfType("float64"),
).Return().Times(1)
// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()
mockSender.On("ServiceCheck",
"ntp.in_sync",
servicecheck.ServiceCheckCritical,
Expand Down Expand Up @@ -288,9 +309,16 @@ hosts:
"ntp.offset",
float64(2),
"",
[]string(nil),
[]string{"source:ntp"},
mock.AnythingOfType("float64"),
).Return().Times(1)
// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()
mockSender.On("ServiceCheck",
"ntp.in_sync",
servicecheck.ServiceCheckOK,
Expand Down Expand Up @@ -338,9 +366,16 @@ hosts:
"ntp.offset",
float64(offset),
"",
[]string(nil),
[]string{"source:ntp"},
mock.AnythingOfType("float64"),
).Return().Times(1)
// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()
mockSender.On("ServiceCheck",
"ntp.in_sync",
servicecheck.ServiceCheckCritical,
Expand Down Expand Up @@ -566,13 +601,21 @@ func TestNTPUsesResponseTimestamp(t *testing.T) {
"ntp.offset",
float64(offset),
"",
[]string(nil),
[]string{"source:ntp"},
mock.MatchedBy(func(ts float64) bool {
actualTS = ts
return true
}),
).Return().Once()

// Intake offset may or may not be submitted depending on whether it's been set
mockSender.On("GaugeWithTimestamp",
"ntp.offset",
mock.AnythingOfType("float64"),
"",
[]string{"source:intake"},
mock.AnythingOfType("float64")).Return().Maybe()

mockSender.
On("ServiceCheck",
"ntp.in_sync",
Expand Down
10 changes: 9 additions & 1 deletion pkg/collector/corechecks/net/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ func (p Provider) getStatusInfo() map[string]interface{} {
func (Provider) populateStatus(stats map[string]interface{}) {
ntpOffset := expvar.Get("ntpOffset")
if ntpOffset != nil && ntpOffset.String() != "" {
float, err := strconv.ParseFloat(expvar.Get("ntpOffset").String(), 64)
float, err := strconv.ParseFloat(ntpOffset.String(), 64)
if err == nil {
stats["ntpOffset"] = float
}
}

intakeOffset := expvar.Get("corechecks_net_ntp_intake_time_offset")
if intakeOffset != nil && intakeOffset.String() != "" {
float, err := strconv.ParseFloat(intakeOffset.String(), 64)
if err == nil {
stats["intakeOffset"] = float
}
}
}

// JSON populates the status map
Expand Down
6 changes: 6 additions & 0 deletions pkg/collector/corechecks/net/status_templates/ntp.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ NTP offset: {{ humanizeDuration .ntpOffset "s"}}
{{yellowText "NTP offset is high. Datadog may ignore metrics sent by this Agent."}}
{{- end }}
{{- end }}
{{- if .intakeOffset }}
Intake offset: {{ humanizeDuration .intakeOffset "s"}}
{{- if ntpWarning .intakeOffset}}
{{yellowText "Intake offset is high. Datadog may drop metrics from this Agent."}}
{{- end }}
{{- end }}
12 changes: 12 additions & 0 deletions pkg/collector/corechecks/net/status_templates/ntpHTML.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@
</div>
{{end}}

{{- if .intakeOffset}}
<div class="stat">
<span class="stat_title">Intake Offset</span>
<span class="stat_data">
<br>{{ humanizeDuration .intakeOffset "s"}}
{{- if ntpWarning .intakeOffset}}
<br><span class="warning">Intake offset is high. Datadog may drop metrics from this Agent.</span>
{{- end}}
</span>
</div>
{{end}}

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
enhancements:
- |
Add ``ntp.offset`` metric with ``source:intake`` tag to monitor clock drift using
Datadog intake server timestamps. Original ``ntp.offset`` metric calculated from
an NTP server is now tagged ``source:ntp``.
Loading