Skip to content

Commit 3054c5a

Browse files
committed
perf: Use local histogram for processing time
1 parent 632ec6a commit 3054c5a

File tree

4 files changed

+57
-11
lines changed

4 files changed

+57
-11
lines changed

src/components/proxy/io_uring_shared.rs

+28-5
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ use crate::{
2424
components::proxy::{self, PendingSends, PipelineError, SendPacket},
2525
metrics,
2626
pool::PoolBuffer,
27-
time::UtcTimestamp,
2827
};
2928
use io_uring::{squeue::Entry, types::Fd};
3029
use socket2::SockAddr;
3130
use std::{
3231
os::fd::{AsRawFd, FromRawFd},
3332
sync::Arc,
33+
time::Instant,
3434
};
3535

3636
/// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html)
@@ -227,7 +227,8 @@ pub enum PacketProcessorCtx {
227227
fn process_packet(
228228
ctx: &mut PacketProcessorCtx,
229229
packet: RecvPacket,
230-
last_received_at: &mut Option<UtcTimestamp>,
230+
last_received_at: &mut Option<Instant>,
231+
processing_time: &prometheus::local::LocalHistogram,
231232
) {
232233
match ctx {
233234
PacketProcessorCtx::Router {
@@ -237,10 +238,10 @@ fn process_packet(
237238
error_acc,
238239
destinations,
239240
} => {
240-
let received_at = UtcTimestamp::now();
241+
let received_at = Instant::now();
241242
if let Some(last_received_at) = last_received_at {
242243
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
243-
.set((received_at - *last_received_at).nanos());
244+
.set((received_at - *last_received_at).as_nanos() as _);
244245
}
245246
*last_received_at = Some(received_at);
246247

@@ -256,6 +257,7 @@ fn process_packet(
256257
sessions,
257258
error_acc,
258259
destinations,
260+
processing_time,
259261
);
260262
}
261263
PacketProcessorCtx::SessionPool { pool, port, .. } => {
@@ -453,6 +455,8 @@ impl IoUringLoop {
453455
// Just double buffer the pending writes for simplicity
454456
let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity());
455457

458+
let processing_metrics = metrics::ProcessingMetrics::new();
459+
456460
// When sending packets, this is the direction used when updating metrics
457461
let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) {
458462
metrics::WRITE
@@ -478,6 +482,8 @@ impl IoUringLoop {
478482
// onto the submission queue for the loop to actually function (ie, similar to await on futures)
479483
loop_ctx.sync();
480484

485+
const FLUSH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
486+
let mut time_since_flush = std::time::Duration::default();
481487
let mut last_received_at = None;
482488

483489
// The core io uring loop
@@ -520,7 +526,24 @@ impl IoUringLoop {
520526
}
521527

522528
let packet = packet.finalize_recv(ret as usize);
523-
process_packet(&mut ctx, packet, &mut last_received_at);
529+
let old_received_at = last_received_at.clone();
530+
process_packet(
531+
&mut ctx,
532+
packet,
533+
&mut last_received_at,
534+
&processing_metrics.read_processing_time,
535+
);
536+
537+
if let (Some(old_received_at), Some(last_received_at)) =
538+
(&old_received_at, &last_received_at)
539+
{
540+
time_since_flush += *last_received_at - *old_received_at;
541+
542+
if time_since_flush >= FLUSH_INTERVAL {
543+
time_since_flush = <_>::default();
544+
processing_metrics.flush();
545+
}
546+
}
524547

525548
loop_ctx.enqueue_recv(buffer_pool.clone().alloc());
526549
}

src/components/proxy/packet_router.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig {
5757
sessions: &Arc<SessionPool>,
5858
error_acc: &mut super::error::ErrorAccumulator,
5959
destinations: &mut Vec<crate::net::EndpointAddress>,
60+
processing_time: &prometheus::local::LocalHistogram,
6061
) {
6162
tracing::trace!(
6263
id = worker_id,
@@ -65,7 +66,7 @@ impl DownstreamReceiveWorkerConfig {
6566
"received packet from downstream"
6667
);
6768

68-
let timer = metrics::processing_time(metrics::READ).start_timer();
69+
let timer = processing_time.start_timer();
6970
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
7071
Ok(()) => {
7172
error_acc.maybe_send();

src/components/proxy/packet_router/reference.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ impl super::DownstreamReceiveWorkerConfig {
3737
let (tx, mut rx) = tokio::sync::oneshot::channel();
3838

3939
let worker = uring_spawn!(thread_span, async move {
40-
let mut last_received_at = None;
40+
let mut last_received_at: Option<std::time::Instant> = None;
4141
let socket = crate::net::DualStackLocalSocket::new(port)
4242
.unwrap()
4343
.make_refcnt();
@@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig {
102102
let mut error_acc =
103103
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
104104
let mut destinations = Vec::with_capacity(1);
105+
let processing_metrics = crate::metrics::ProcessingMetrics::new();
105106

106107
loop {
107108
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
@@ -110,7 +111,7 @@ impl super::DownstreamReceiveWorkerConfig {
110111

111112
tokio::select! {
112113
received = socket.recv_from(buffer) => {
113-
let received_at = crate::time::UtcTimestamp::now();
114+
let received_at = std::time::Instant::now();
114115
let (result, buffer) = received;
115116

116117
match result {
@@ -123,7 +124,7 @@ impl super::DownstreamReceiveWorkerConfig {
123124
crate::metrics::READ,
124125
&crate::metrics::EMPTY,
125126
)
126-
.set((received_at - last_received_at).nanos());
127+
.set((received_at - last_received_at).as_nanos() as _);
127128
}
128129
last_received_at = Some(received_at);
129130

@@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig {
134135
&sessions,
135136
&mut error_acc,
136137
&mut destinations,
138+
&processing_metrics.read_processing_time,
137139
);
140+
141+
processing_metrics.flush();
138142
}
139143
Err(error) => {
140144
tracing::error!(%error, "error receiving packet");

src/metrics.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
use crate::net::maxmind_db::MetricsIpNetEntry;
1818
use once_cell::sync::Lazy;
1919
use prometheus::{
20-
core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
21-
IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
20+
core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter,
21+
IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
2222
};
2323

2424
pub use prometheus::Result;
@@ -282,6 +282,24 @@ pub trait CollectorExt: Collector + Clone + Sized + 'static {
282282

283283
impl<C: Collector + Clone + 'static> CollectorExt for C {}
284284

285+
/// A local instance of all of the metrics related to packet processing.
286+
pub struct ProcessingMetrics {
287+
pub read_processing_time: LocalHistogram,
288+
}
289+
290+
impl ProcessingMetrics {
291+
pub fn new() -> Self {
292+
Self {
293+
read_processing_time: processing_time(READ).local(),
294+
}
295+
}
296+
297+
#[inline]
298+
pub fn flush(&self) {
299+
self.read_processing_time.flush();
300+
}
301+
}
302+
285303
#[cfg(test)]
286304
mod test {
287305
fn check(num: u64, exp: &str) {

0 commit comments

Comments
 (0)