Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/crates/public/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait Sendable: Debug + Send + 'static {
fn to_kv_string(&self, _: &mut String) {}
}

#[derive(Debug, Clone, Copy, IntoPrimitive)]
#[derive(Debug, Clone, Copy, IntoPrimitive, PartialEq)]
#[repr(u8)]
pub enum SendMessageType {
Compress = 0,
Expand Down
4 changes: 3 additions & 1 deletion agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1932,7 +1932,7 @@ impl Default for Limits {
Self {
max_millicpus: 1000,
max_memory: 768 << 20,
max_log_backhaul_rate: 300,
max_log_backhaul_rate: 36000,
max_local_log_file_size: 1000 << 20,
local_log_retention: Duration::from_secs(300 * 24 * 3600),
max_sockets: 1024,
Expand Down Expand Up @@ -2104,6 +2104,7 @@ pub struct Communication {
pub ingester_port: u16,
#[serde(deserialize_with = "deser_usize_with_mega_unit")]
pub grpc_buffer_size: usize,
pub max_throughput_to_ingester: u64,
pub request_via_nat_ip: bool,
pub proxy_controller_ip: String,
pub proxy_controller_port: u16,
Expand All @@ -2119,6 +2120,7 @@ impl Default for Communication {
ingester_ip: "".to_string(),
ingester_port: 30033,
grpc_buffer_size: 5 << 20,
max_throughput_to_ingester: 100,
request_via_nat_ip: false,
}
}
Expand Down
11 changes: 11 additions & 0 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub struct SenderConfig {
pub npb_bps_threshold: u64,
pub npb_socket_type: agent::SocketType,
pub multiple_sockets_to_ingester: bool,
pub max_throughput_to_ingester: u64, // unit: Mbps
pub collector_socket_type: agent::SocketType,
pub standalone_data_file_size: u32,
pub standalone_data_file_dir: String,
Expand Down Expand Up @@ -1716,6 +1717,7 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig {
.tx_throughput
.throughput_monitoring_interval,
multiple_sockets_to_ingester: conf.outputs.socket.multiple_sockets_to_ingester,
max_throughput_to_ingester: conf.global.communication.max_throughput_to_ingester,
collector_socket_type: conf.outputs.socket.data_socket_type,
standalone_data_file_size: conf.global.standalone_mode.max_data_file_size,
standalone_data_file_dir: conf.global.standalone_mode.data_file_dir.clone(),
Expand Down Expand Up @@ -3749,6 +3751,15 @@ impl ConfigHandler {
communication.grpc_buffer_size = new_communication.grpc_buffer_size;
restart_agent = !first_run;
}
if communication.max_throughput_to_ingester != new_communication.max_throughput_to_ingester
{
info!(
"Update global.communication.max_throughput_to_ingester from {:?} to {:?}.",
communication.max_throughput_to_ingester,
new_communication.max_throughput_to_ingester
);
communication.max_throughput_to_ingester = new_communication.max_throughput_to_ingester;
}
if communication.ingester_ip != new_communication.ingester_ip {
info!(
"Update global.communication.ingester_ip from {:?} to {:?}.",
Expand Down
3 changes: 2 additions & 1 deletion agent/src/exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ impl ExceptionHandler {
| Exception::ControllerSocketError as u64
| Exception::AnalyzerSocketError as u64
| Exception::IntegrationSocketError as u64
| Exception::NpbSocketError as u64;
| Exception::NpbSocketError as u64
| Exception::DataBpsThresholdExceeded as u64;

pub fn set(&self, e: Exception) {
self.0.fetch_or(e as u64, Ordering::SeqCst);
Expand Down
49 changes: 48 additions & 1 deletion agent/src/sender/uniform_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ impl<T: Sendable> UniformSenderThread<T> {

lazy_static! {
static ref GLOBAL_CONNECTION: Arc<Mutex<Connection>> = Arc::new(Mutex::new(Connection::new()));
static ref TOTAL_SENT_BYTES: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
static ref SENT_START_DURATION: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
static ref LAST_LOGGING_DURATION: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -599,6 +602,7 @@ impl<T: Sendable> UniformSender<T> {
self.counter
.tx_bytes
.fetch_add(buffer.len() as u64, Ordering::Relaxed);
TOTAL_SENT_BYTES.fetch_add(buffer.len() as u64, Ordering::Relaxed);
break;
}
}
Expand All @@ -622,6 +626,39 @@ impl<T: Sendable> UniformSender<T> {
}
}

fn is_exceed_max_throughput(&mut self, max_throughput_mbps: u64) -> bool {
if max_throughput_mbps == 0 {
return false;
}
let max_throughput_bytes = max_throughput_mbps << 20 >> 3;
if TOTAL_SENT_BYTES.load(Ordering::Relaxed) > max_throughput_bytes {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

let used = now - Duration::from_nanos(SENT_START_DURATION.load(Ordering::Relaxed));
if used > Duration::from_secs(1) {
SENT_START_DURATION.store(now.as_nanos() as u64, Ordering::Relaxed);
TOTAL_SENT_BYTES.store(0, Ordering::Relaxed);
} else {
// to prevent frequent log printing, print at least once every 5 seconds
if now - Duration::from_nanos(LAST_LOGGING_DURATION.load(Ordering::Relaxed))
> Duration::from_secs(5)
{
warn!(
"{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps",
self.name, max_throughput_mbps
);
LAST_LOGGING_DURATION.store(now.as_nanos() as u64, Ordering::Relaxed);
}
self.exception_handler
.set(Exception::DataBpsThresholdExceeded);
return true;
}
}
return false;
}

fn check_or_register_counterable(&mut self, message_type: SendMessageType) {
if self.stats_registered {
return;
Expand All @@ -638,7 +675,9 @@ impl<T: Sendable> UniformSender<T> {
let mut kv_string = String::with_capacity(2048);
let mut batch = Vec::with_capacity(QUEUE_BATCH_SIZE);
while self.running.load(Ordering::Relaxed) {
let socket_type = self.config.load().collector_socket_type;
let config = self.config.load();
let socket_type = config.collector_socket_type;
let max_throughput_mpbs = config.max_throughput_to_ingester;
match self.input.recv_all(
&mut batch,
Some(Duration::from_secs(Self::QUEUE_READ_TIMEOUT)),
Expand All @@ -649,6 +688,13 @@ impl<T: Sendable> UniformSender<T> {
start_cached = Instant::now();
self.cached = false;
}
if self.is_exceed_max_throughput(max_throughput_mpbs) {
self.counter
.dropped
.fetch_add(batch.len() as u64, Ordering::Relaxed);
batch.clear();
continue;
}
for send_item in batch.drain(..) {
if !self.running.load(Ordering::Relaxed) {
break;
Expand All @@ -659,6 +705,7 @@ impl<T: Sendable> UniformSender<T> {
"{} sender send item {}: {:?}",
self.name, message_type, send_item
);

let result = match socket_type {
SocketType::File => self.handle_target_file(send_item, &mut kv_string),
_ => self.handle_target_server(send_item),
Expand Down
1 change: 1 addition & 0 deletions message/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum Exception {
INTEGRATION_SOCKET_ERROR = 262144;
CGROUPS_CONFIG_ERROR = 524288;
SYSTEM_LOAD_CIRCUIT_BREAKER = 1048576;
DATA_BPS_THRESHOLD_EXCEEDED = 2097152;
// 2^31及以下由 agent ,agent 最大可用异常是2^31,顺序从前往后
// 2^32及以上由控制器使用,顺序从后往前
}
Expand Down
34 changes: 32 additions & 2 deletions server/agent_config/README-CH.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ Upgrade from old version: `log_threshold`
```yaml
global:
limits:
max_log_backhaul_rate: 300
max_log_backhaul_rate: 36000
```

**模式**:
| Key | Value |
| ---- | ---------------------------- |
| Type | int |
| Unit | Lines/Hour |
| Range | [0, 10000] |
| Range | [0, 1000000] |

**详细描述**:

Expand Down Expand Up @@ -1023,6 +1023,36 @@ global:

设置 deepflow-agent 的 gRPC socket 缓冲区大小。

### 发送到 Ingester 的最大流量 {#global.communication.max_throughput_to_ingester}

**标签**:

`hot_update`

**FQCN**:

`global.communication.max_throughput_to_ingester`

**默认值**:
```yaml
global:
communication:
max_throughput_to_ingester: 100
```

**模式**:
| Key | Value |
| ---- | ---------------------------- |
| Type | int |
| Unit | Mbps |
| Range | [0, 10000] |

**详细描述**:

向 Server 端 Ingester 模块发送可观测性数据的最大允许流量,
超过此限速时数据将会主动丢弃、且采集器会标记为异常状态并触发告警。
配置为 0 表示不限速。

### 请求 NAT IP 地址 {#global.communication.request_via_nat_ip}

**标签**:
Expand Down
35 changes: 33 additions & 2 deletions server/agent_config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ Upgrade from old version: `log_threshold`
```yaml
global:
limits:
max_log_backhaul_rate: 300
max_log_backhaul_rate: 36000
```

**Schema**:
| Key | Value |
| ---- | ---------------------------- |
| Type | int |
| Unit | Lines/Hour |
| Range | [0, 10000] |
| Range | [0, 1000000] |

**Description**:

Expand Down Expand Up @@ -1047,6 +1047,37 @@ global:

gRPC socket buffer size.

### Max Throughput To Ingester {#global.communication.max_throughput_to_ingester}

**Tags**:

`hot_update`

**FQCN**:

`global.communication.max_throughput_to_ingester`

**Default value**:
```yaml
global:
communication:
max_throughput_to_ingester: 100
```

**Schema**:
| Key | Value |
| ---- | ---------------------------- |
| Type | int |
| Unit | Mbps |
| Range | [0, 10000] |

**Description**:

The maximum allowed flow rate for sending observability data to the server-side Ingester module.
When this rate limit is exceeded, the data will be actively discarded,
and the agent will be marked as abnormal and trigger an alarm.
Setting it to 0 means no speed limit.

### Request via NAT IP Address {#global.communication.request_via_nat_ip}

**Tags**:
Expand Down
24 changes: 22 additions & 2 deletions server/agent_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ global:
# en: Maximum Log Backhaul Rate
# ch: 日志每小时回传上限
# unit: Lines/Hour
# range: [0, 10000]
# range: [0, 1000000]
# enum_options: []
# modification: hot_update
# ee_feature: false
Expand All @@ -61,7 +61,7 @@ global:
# ch: |-
# 用于 deepflow-agent 控制自身运行日志的每小时回传数量,设置为 0 表示不设限制。
# upgrade_from: log_threshold
max_log_backhaul_rate: 300
max_log_backhaul_rate: 36000
# type: int
# name:
# en: Maximum Local Log File Size
Expand Down Expand Up @@ -699,6 +699,26 @@ global:
# 设置 deepflow-agent 的 gRPC socket 缓冲区大小。
# upgrade_from: static_config.grpc-buffer-size
grpc_buffer_size: 5
# type: int
# name:
# en: Max Throughput To Ingester
# ch: 发送到 Ingester 的最大流量
# unit: Mbps
# range: [0, 10000]
# enum_options: []
# modification: hot_update
# ee_feature: false
# description:
# en: |-
# The maximum allowed flow rate for sending observability data to the server-side Ingester module.
# When this rate limit is exceeded, the data will be actively discarded,
# and the agent will be marked as abnormal and trigger an alarm.
# Setting it to 0 means no speed limit.
# ch: |-
# 向 Server 端 Ingester 模块发送可观测性数据的最大允许流量,
# 超过此限速时数据将会主动丢弃、且采集器会标记为异常状态并触发告警。
# 配置为 0 表示不限速。
max_throughput_to_ingester: 100
# type: bool
# name:
# en: Request via NAT IP Address
Expand Down
2 changes: 2 additions & 0 deletions server/controller/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ var VTapExceptionChinese = map[int64]string{
1 << 17: "分发SOCKET错误",
1 << 18: "集成SOCKET错误",
1 << 19: "CGROUPS配置错误",
1 << 20: "系统负载超限触发熔断",
1 << 21: "数据流量达到限速",
VTAP_EXCEPTION_LICENSE_NOT_ENGOUTH: "采集器授权个数不足",
VTAP_EXCEPTION_ALLOC_ANALYZER_FAILED: "分配数据节点失败",
VTAP_EXCEPTION_ALLOC_CONTROLLER_FAILED: "分配控制器失败",
Expand Down
Loading