Skip to content

Commit 14b3565

Browse files
committed
Add intake offset metric for clock drift monitoring
Introduces ntp.offset metric with source:intake tag to monitor clock drift using Datadog intake server timestamps from HTTP responses. This provides clock monitoring even when NTP is blocked by firewalls. Changes: - Capture Date header from intake HTTP responses in forwarder - Store intake offset in expvar for global access - Submit ntp.offset metric with source:intake tag (independent of NTP check) - Display both NTP and Intake offsets in agent status Clocks section - Update tests to handle new metric submission The metric uses intake server time for accurate drift detection and is submitted even when NTP queries fail.
1 parent 8e1a748 commit 14b3565

File tree

6 files changed

+106
-9
lines changed

6 files changed

+106
-9
lines changed

comp/forwarder/defaultforwarder/transaction/transaction.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ var (
7272
[]string{"domain", "endpoint", "error_type"}, "Count of transactions errored grouped by type of error")
7373
tlmTxHTTPErrors = telemetry.NewCounter("transactions", "http_errors",
7474
[]string{"domain", "endpoint", "code"}, "Count of transactions http errors per http code")
75+
76+
// intakeOffsetExpVar stores the time offset between the agent and Datadog intake
77+
// Captured from the Date header in successful HTTP responses
78+
intakeOffsetExpVar = expvar.NewFloat("intakeOffset")
7579
)
7680

7781
var trace *httptrace.ClientTrace
@@ -401,6 +405,16 @@ func (t *HTTPTransaction) internalProcess(ctx context.Context, config config.Com
401405
}
402406
defer func() { _ = resp.Body.Close() }()
403407

408+
// Capture intake server time for clock offset monitoring
409+
// Parse the Date header to calculate the time difference between agent and intake
410+
if dateHeader := resp.Header.Get("Date"); dateHeader != "" {
411+
if serverTime, err := http.ParseTime(dateHeader); err == nil {
412+
// Calculate offset: positive means agent clock is ahead, negative means behind
413+
offset := time.Since(serverTime).Seconds()
414+
intakeOffsetExpVar.Set(offset)
415+
}
416+
}
417+
404418
body, err := io.ReadAll(resp.Body)
405419
if err != nil {
406420
log.Errorf("Fail to read the response Body: %s", err)

pkg/collector/corechecks/net/ntp/ntp.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ const (
3636
)
3737

3838
var (
39-
ntpExpVar = expvar.NewFloat("ntpOffset")
39+
ntpExpVar = expvar.NewFloat("ntpOffset")
40+
intakeOffsetExpVar = expvar.NewFloat("intakeOffset")
4041
// for testing purpose
4142
ntpQuery = ntp.QueryWithOptions
4243

@@ -213,6 +214,18 @@ func (c *NTPCheck) Run() error {
213214
serviceCheckMessage := ""
214215
offsetThreshold := c.cfg.instance.OffsetThreshold
215216

217+
// Submit intake offset first (captured from forwarder responses)
218+
// This is independent of NTP check success
219+
intakeOffset := intakeOffsetExpVar.Value()
220+
if !math.IsNaN(intakeOffset) {
221+
// Use server time as timestamp: offset is positive when agent is ahead
222+
// So server_time = agent_time - offset
223+
agentTime := time.Now()
224+
serverTime := agentTime.Add(-time.Duration(intakeOffset * float64(time.Second)))
225+
intakeTS := float64(serverTime.UnixNano()) / 1e9
226+
_ = sender.GaugeWithTimestamp("ntp.offset", intakeOffset, "", []string{"source:intake"}, intakeTS)
227+
}
228+
216229
clockOffset, ts, err := c.queryOffset()
217230
if err != nil {
218231
log.Error(err)
@@ -230,9 +243,10 @@ func (c *NTPCheck) Run() error {
230243
serviceCheckStatus = servicecheck.ServiceCheckOK
231244
}
232245

233-
_ = sender.GaugeWithTimestamp("ntp.offset", clockOffset, "", nil, ts)
246+
_ = sender.GaugeWithTimestamp("ntp.offset", clockOffset, "", []string{"source:ntp"}, ts)
234247
ntpExpVar.Set(clockOffset)
235248
tlmNtpOffset.Set(clockOffset)
249+
236250
sender.ServiceCheck("ntp.in_sync", serviceCheckStatus, "", nil, serviceCheckMessage)
237251

238252
c.lastCollection = time.Now()

pkg/collector/corechecks/net/ntp/ntp_test.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,16 @@ func TestNTPOK(t *testing.T) {
9898
"ntp.offset",
9999
float64(offset),
100100
"",
101-
[]string(nil),
101+
[]string{"source:ntp"},
102102
mock.AnythingOfType("float64"),
103103
).Return().Times(1)
104+
// Intake offset may or may not be submitted depending on whether it's been set
105+
mockSender.On("GaugeWithTimestamp",
106+
"ntp.offset",
107+
mock.AnythingOfType("float64"),
108+
"",
109+
[]string{"source:intake"},
110+
mock.AnythingOfType("float64")).Return().Maybe()
104111
mockSender.On("ServiceCheck",
105112
"ntp.in_sync",
106113
servicecheck.ServiceCheckOK,
@@ -140,9 +147,16 @@ func TestNTPCritical(t *testing.T) {
140147
"ntp.offset",
141148
float64(offset),
142149
"",
143-
[]string(nil),
150+
[]string{"source:ntp"},
144151
mock.AnythingOfType("float64"),
145152
).Return().Times(1)
153+
// Intake offset may or may not be submitted depending on whether it's been set
154+
mockSender.On("GaugeWithTimestamp",
155+
"ntp.offset",
156+
mock.AnythingOfType("float64"),
157+
"",
158+
[]string{"source:intake"},
159+
mock.AnythingOfType("float64")).Return().Maybe()
146160
mockSender.On("ServiceCheck",
147161
"ntp.in_sync",
148162
servicecheck.ServiceCheckCritical,
@@ -238,9 +252,16 @@ func TestNTPNegativeOffsetCritical(t *testing.T) {
238252
"ntp.offset",
239253
float64(offset),
240254
"",
241-
[]string(nil),
255+
[]string{"source:ntp"},
242256
mock.AnythingOfType("float64"),
243257
).Return().Times(1)
258+
// Intake offset may or may not be submitted depending on whether it's been set
259+
mockSender.On("GaugeWithTimestamp",
260+
"ntp.offset",
261+
mock.AnythingOfType("float64"),
262+
"",
263+
[]string{"source:intake"},
264+
mock.AnythingOfType("float64")).Return().Maybe()
244265
mockSender.On("ServiceCheck",
245266
"ntp.in_sync",
246267
servicecheck.ServiceCheckCritical,
@@ -288,9 +309,16 @@ hosts:
288309
"ntp.offset",
289310
float64(2),
290311
"",
291-
[]string(nil),
312+
[]string{"source:ntp"},
292313
mock.AnythingOfType("float64"),
293314
).Return().Times(1)
315+
// Intake offset may or may not be submitted depending on whether it's been set
316+
mockSender.On("GaugeWithTimestamp",
317+
"ntp.offset",
318+
mock.AnythingOfType("float64"),
319+
"",
320+
[]string{"source:intake"},
321+
mock.AnythingOfType("float64")).Return().Maybe()
294322
mockSender.On("ServiceCheck",
295323
"ntp.in_sync",
296324
servicecheck.ServiceCheckOK,
@@ -338,9 +366,16 @@ hosts:
338366
"ntp.offset",
339367
float64(offset),
340368
"",
341-
[]string(nil),
369+
[]string{"source:ntp"},
342370
mock.AnythingOfType("float64"),
343371
).Return().Times(1)
372+
// Intake offset may or may not be submitted depending on whether it's been set
373+
mockSender.On("GaugeWithTimestamp",
374+
"ntp.offset",
375+
mock.AnythingOfType("float64"),
376+
"",
377+
[]string{"source:intake"},
378+
mock.AnythingOfType("float64")).Return().Maybe()
344379
mockSender.On("ServiceCheck",
345380
"ntp.in_sync",
346381
servicecheck.ServiceCheckCritical,
@@ -566,13 +601,21 @@ func TestNTPUsesResponseTimestamp(t *testing.T) {
566601
"ntp.offset",
567602
float64(offset),
568603
"",
569-
[]string(nil),
604+
[]string{"source:ntp"},
570605
mock.MatchedBy(func(ts float64) bool {
571606
actualTS = ts
572607
return true
573608
}),
574609
).Return().Once()
575610

611+
// Intake offset may or may not be submitted depending on whether it's been set
612+
mockSender.On("GaugeWithTimestamp",
613+
"ntp.offset",
614+
mock.AnythingOfType("float64"),
615+
"",
616+
[]string{"source:intake"},
617+
mock.AnythingOfType("float64")).Return().Maybe()
618+
576619
mockSender.
577620
On("ServiceCheck",
578621
"ntp.in_sync",

pkg/collector/corechecks/net/status.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,19 @@ func (p Provider) getStatusInfo() map[string]interface{} {
4141
func (Provider) populateStatus(stats map[string]interface{}) {
4242
ntpOffset := expvar.Get("ntpOffset")
4343
if ntpOffset != nil && ntpOffset.String() != "" {
44-
float, err := strconv.ParseFloat(expvar.Get("ntpOffset").String(), 64)
44+
float, err := strconv.ParseFloat(ntpOffset.String(), 64)
4545
if err == nil {
4646
stats["ntpOffset"] = float
4747
}
4848
}
49+
50+
intakeOffset := expvar.Get("intakeOffset")
51+
if intakeOffset != nil && intakeOffset.String() != "" {
52+
float, err := strconv.ParseFloat(intakeOffset.String(), 64)
53+
if err == nil {
54+
stats["intakeOffset"] = float
55+
}
56+
}
4957
}
5058

5159
// JSON populates the status map

pkg/collector/corechecks/net/status_templates/ntp.tmpl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ NTP offset: {{ humanizeDuration .ntpOffset "s"}}
44
{{yellowText "NTP offset is high. Datadog may ignore metrics sent by this Agent."}}
55
{{- end }}
66
{{- end }}
7+
{{- if .intakeOffset }}
8+
Intake offset: {{ humanizeDuration .intakeOffset "s"}}
9+
{{- if ntpWarning .intakeOffset}}
10+
{{yellowText "Intake offset is high. Datadog may drop metrics from this Agent."}}
11+
{{- end }}
12+
{{- end }}

pkg/collector/corechecks/net/status_templates/ntpHTML.tmpl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,15 @@
1010
</div>
1111
{{end}}
1212

13+
{{- if .intakeOffset}}
14+
<div class="stat">
15+
<span class="stat_title">Intake Offset</span>
16+
<span class="stat_data">
17+
<br>{{ humanizeDuration .intakeOffset "s"}}
18+
{{- if ntpWarning .intakeOffset}}
19+
<br><span class="warning">Intake offset is high. Datadog may drop metrics from this Agent.</span>
20+
{{- end}}
21+
</span>
22+
</div>
23+
{{end}}
24+

0 commit comments

Comments
 (0)