-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add more socket features #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| use pree::socket::monitor::{SocketEvent, SocketMonitor}; | ||
| use pree::socket::platform::{CongestionControl, CongestionState, SocketStats}; | ||
| use std::thread; | ||
| use std::time::Duration; | ||
|
|
||
| fn main() -> pree::Result<()> { | ||
| // Create a socket monitor with a shorter interval for more frequent updates | ||
| let mut monitor = SocketMonitor::new().interval(Duration::from_millis(500)); | ||
|
|
||
| // Register callbacks for socket events with analysis | ||
| monitor.on_socket_change(|event| match event { | ||
| SocketEvent::Opened(socket) => { | ||
| println!("New socket opened:"); | ||
| print_socket_info(&socket); | ||
| if let Some(stats) = &socket.stats { | ||
| analyze_socket_stats(stats); | ||
| } | ||
| println!(); | ||
| } | ||
| SocketEvent::Closed(socket) => { | ||
| println!("Socket closed:"); | ||
| print_socket_info(&socket); | ||
| if let Some(stats) = &socket.stats { | ||
| analyze_socket_stats(stats); | ||
| } | ||
| println!(); | ||
| } | ||
| SocketEvent::StateChanged(socket) => { | ||
| println!("Socket state changed:"); | ||
| print_socket_info(&socket); | ||
| if let Some(stats) = &socket.stats { | ||
| analyze_socket_stats(stats); | ||
| } | ||
| println!(); | ||
| } | ||
| })?; | ||
|
|
||
| // Start monitoring | ||
| monitor.start()?; | ||
|
|
||
| // Monitor for 30 seconds | ||
| thread::sleep(Duration::from_secs(30)); | ||
|
|
||
| // Stop monitoring | ||
| monitor.stop(); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn print_socket_info(socket: &pree::socket::platform::SocketInfo) { | ||
| println!(" Protocol: {}", socket.protocol); | ||
| println!(" Local: {}", socket.local_addr); | ||
| println!(" Remote: {}", socket.remote_addr); | ||
| println!(" State: {:?}", socket.state); | ||
| if let Some(pid) = socket.process_id { | ||
| println!( | ||
| " Process: {} (PID: {})", | ||
| socket.process_name.as_deref().unwrap_or("unknown"), | ||
| pid | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| fn analyze_socket_stats(stats: &SocketStats) { | ||
| println!(" Connection Analysis:"); | ||
|
|
||
| // Basic metrics | ||
| println!( | ||
| " Bytes sent/received: {}/{}", | ||
| stats.bytes_sent, stats.bytes_received | ||
| ); | ||
| println!( | ||
| " Packets sent/received: {}/{}", | ||
| stats.packets_sent, stats.packets_received | ||
| ); | ||
| println!(" Retransmits: {}", stats.retransmits); | ||
|
|
||
| // RTT analysis | ||
| if let Some(rtt) = stats.rtt { | ||
| println!(" RTT: {:.2}ms", rtt.as_secs_f32() * 1000.0); | ||
| if let Some(rtt_var) = stats.rtt_variance { | ||
| println!(" RTT variance: {:.2}ms", rtt_var.as_secs_f32() * 1000.0); | ||
| } | ||
| } | ||
|
|
||
| // Congestion control analysis | ||
| if let Some(cc) = &stats.congestion_control { | ||
| println!(" Congestion Control: {cc:?}"); | ||
| match cc { | ||
| CongestionControl::Cubic => { | ||
| println!(" Using CUBIC algorithm for high-speed networks"); | ||
| } | ||
| CongestionControl::Bbr => { | ||
| println!(" Using BBR for better throughput and latency"); | ||
| } | ||
| CongestionControl::Reno => { | ||
| println!(" Using Reno for basic congestion control"); | ||
| } | ||
| CongestionControl::Vegas => { | ||
| println!(" Using Vegas for proactive congestion avoidance"); | ||
| } | ||
| CongestionControl::Westwood => { | ||
| println!(" Using Westwood for wireless networks"); | ||
| } | ||
| CongestionControl::Other(s) => println!(" Using custom algorithm: {s}"), | ||
| } | ||
| } | ||
| if let Some(state) = &stats.congestion_state { | ||
| println!(" Congestion State: {state:?}"); | ||
| match state { | ||
| CongestionState::SlowStart => { | ||
| println!(" In slow start phase - growing window exponentially"); | ||
| } | ||
| CongestionState::CongestionAvoidance => { | ||
| println!(" In congestion avoidance - growing window linearly"); | ||
| } | ||
| CongestionState::FastRecovery => { | ||
| println!(" In fast recovery - recovering from packet loss"); | ||
| } | ||
| CongestionState::FastRetransmit => { | ||
| println!(" In fast retransmit - retransmitting lost packets"); | ||
| } | ||
| CongestionState::Other(s) => println!(" In custom state: {s}"), | ||
| } | ||
| } | ||
| if stats.is_congested() { | ||
| println!(" ⚠️ Connection is experiencing congestion"); | ||
| } | ||
|
|
||
| // Connection quality analysis | ||
| let quality_score = stats.calculate_quality_score(); | ||
| println!( | ||
| " Connection Quality Score: {:.1}%", | ||
| quality_score * 100.0 | ||
| ); | ||
|
|
||
| if let Some(util) = stats.bandwidth_utilization() { | ||
| println!(" Bandwidth Utilization: {util:.1}%"); | ||
| } | ||
|
|
||
| let loss_rate = stats.packet_loss_rate(); | ||
| if loss_rate > 0.0 { | ||
| println!(" Packet Loss Rate: {loss_rate:.1}%"); | ||
| } | ||
|
|
||
| if stats.has_buffer_bloat() { | ||
| println!(" ⚠️ Buffer bloat detected"); | ||
| } | ||
|
|
||
| // SACK and ECN analysis | ||
| if stats.sack_enabled { | ||
| println!(" SACK enabled"); | ||
| if let Some(blocks) = stats.sack_blocks { | ||
| println!(" SACK blocks: {blocks}"); | ||
| } | ||
| if let Some(reordering) = stats.sack_reordering { | ||
| println!(" SACK reordering events: {reordering}"); | ||
| } | ||
| } | ||
|
|
||
| if stats.ecn_enabled { | ||
| println!(" ECN enabled"); | ||
| if let Some(ce_count) = stats.ecn_ce_count { | ||
| println!(" ECN Congestion Experienced count: {ce_count}"); | ||
| } | ||
| } | ||
|
|
||
| // Window analysis | ||
| if let (Some(send_win), Some(recv_win)) = (stats.send_window, stats.receive_window) { | ||
| println!(" Send/Receive Window: {send_win}/{recv_win}"); | ||
| } | ||
| if let Some(zero_win) = stats.zero_window_events { | ||
| if zero_win > 0 { | ||
| println!(" ⚠️ Zero window events: {zero_win}"); | ||
| } | ||
| } | ||
|
|
||
| // Connection duration | ||
| if let Some(duration) = stats.connection_duration { | ||
| println!(" Connection Duration: {:.1}s", duration.as_secs_f32()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; | |||||||||||||||||||||||||||||
| use std::thread; | ||||||||||||||||||||||||||||||
| use std::time::Duration; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| use crate::socket::platform::SocketInfo; | ||||||||||||||||||||||||||||||
| use crate::socket::platform::{SocketFamily, SocketInfo, SocketType}; | ||||||||||||||||||||||||||||||
| use crate::socket::socket::Socket; | ||||||||||||||||||||||||||||||
| use crate::types::{Protocol, SocketState}; | ||||||||||||||||||||||||||||||
| use crate::Result; | ||||||||||||||||||||||||||||||
|
|
@@ -126,6 +126,14 @@ impl SocketMonitor { | |||||||||||||||||||||||||||||
| process_id: socket.process_id, | ||||||||||||||||||||||||||||||
| process_name: socket.process_name.clone(), | ||||||||||||||||||||||||||||||
| stats: None, | ||||||||||||||||||||||||||||||
| socket_type: match socket.protocol { | ||||||||||||||||||||||||||||||
| Protocol::Tcp => Some(SocketType::Stream), | ||||||||||||||||||||||||||||||
| Protocol::Udp => Some(SocketType::Datagram), | ||||||||||||||||||||||||||||||
| _ => None, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| socket_family: Some(SocketFamily::Inet), | ||||||||||||||||||||||||||||||
| socket_flags: None, | ||||||||||||||||||||||||||||||
| socket_options: None, | ||||||||||||||||||||||||||||||
|
Comment on lines
+129
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Extract socket metadata creation to reduce code duplication. The socket metadata assignment logic is duplicated three times (here, lines 168-175, and 210-217). This violates the DRY principle and makes maintenance harder. Consider extracting this logic into a helper function: +fn create_socket_metadata(protocol: Protocol) -> (Option<SocketType>, Option<SocketFamily>, Option<SocketFlags>, Option<SocketOptions>) {
+ let socket_type = match protocol {
+ Protocol::Tcp => Some(SocketType::Stream),
+ Protocol::Udp => Some(SocketType::Datagram),
+ _ => None,
+ };
+ (socket_type, Some(SocketFamily::Inet), None, None)
+}Then use it in all three locations: + let (socket_type, socket_family, socket_flags, socket_options) = create_socket_metadata(socket.protocol);
socket_type: match socket.protocol {
Protocol::Tcp => Some(SocketType::Stream),
Protocol::Udp => Some(SocketType::Datagram),
_ => None,
},
- socket_family: Some(SocketFamily::Inet),
- socket_flags: None,
- socket_options: None,
+ socket_type,
+ socket_family,
+ socket_flags,
+ socket_options,📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| let _ = tx.send(SocketEvent::Opened(info)); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
@@ -157,6 +165,14 @@ impl SocketMonitor { | |||||||||||||||||||||||||||||
| process_id: socket.process_id, | ||||||||||||||||||||||||||||||
| process_name: socket.process_name.clone(), | ||||||||||||||||||||||||||||||
| stats: None, | ||||||||||||||||||||||||||||||
| socket_type: match socket.protocol { | ||||||||||||||||||||||||||||||
| Protocol::Tcp => Some(SocketType::Stream), | ||||||||||||||||||||||||||||||
| Protocol::Udp => Some(SocketType::Datagram), | ||||||||||||||||||||||||||||||
| _ => None, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| socket_family: Some(SocketFamily::Inet), | ||||||||||||||||||||||||||||||
| socket_flags: None, | ||||||||||||||||||||||||||||||
| socket_options: None, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| let _ = tx.send(SocketEvent::Closed(info)); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
@@ -191,6 +207,14 @@ impl SocketMonitor { | |||||||||||||||||||||||||||||
| process_id: socket.process_id, | ||||||||||||||||||||||||||||||
| process_name: socket.process_name.clone(), | ||||||||||||||||||||||||||||||
| stats: None, | ||||||||||||||||||||||||||||||
| socket_type: match socket.protocol { | ||||||||||||||||||||||||||||||
| Protocol::Tcp => Some(SocketType::Stream), | ||||||||||||||||||||||||||||||
| Protocol::Udp => Some(SocketType::Datagram), | ||||||||||||||||||||||||||||||
| _ => None, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| socket_family: Some(SocketFamily::Inet), | ||||||||||||||||||||||||||||||
| socket_flags: None, | ||||||||||||||||||||||||||||||
| socket_options: None, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| let _ = tx.send(SocketEvent::StateChanged(info)); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify bandwidth_utilization() method handles edge cases properly.
The code assumes
bandwidth_utilization()returns a valid percentage, but should verify the method handles cases where bandwidth calculation might fail or return invalid values.🏁 Script executed:
Length of output: 115
🏁 Script executed:
Length of output: 881
🏁 Script executed:
Length of output: 936
🏁 Script executed:
Length of output: 1955
Guard against zero
connection_durationto avoid division by zeroThe current
bandwidth_utilization()checksrtt_secs > 0.0but still divides byself.connection_duration?.as_secs_f32()without verifying that the duration is non-zero. Ifconnection_durationis zero, this yieldsInf/NaNwhich then gets clamped to 100%, masking the invalid case.• File:
src/socket/platform.rs, methodbandwidth_utilization• Add a check for
duration.as_secs_f32() > 0.0(similar to thertt_secsguard), returningNonewhen it’s zeroSuggested diff:
pub fn bandwidth_utilization(&self) -> Option<f32> { if let (Some(cwnd), Some(mss)) = (self.congestion_window, self.snd_mss) { if let Some(rtt) = self.rtt { let rtt_secs = rtt.as_secs_f32(); if rtt_secs > 0.0 { - let max_bandwidth = (cwnd * mss) as f32 / rtt_secs; - let actual_bandwidth = - self.bytes_sent as f32 / self.connection_duration?.as_secs_f32(); - Some((actual_bandwidth / max_bandwidth * 100.0).min(100.0)) + let max_bandwidth = (cwnd * mss) as f32 / rtt_secs; + // Ensure we don't divide by zero on connection_duration + if let Some(duration) = self.connection_duration { + let duration_secs = duration.as_secs_f32(); + if duration_secs > 0.0 { + let actual_bandwidth = self.bytes_sent as f32 / duration_secs; + return Some((actual_bandwidth / max_bandwidth * 100.0).min(100.0)); + } + } + None } else { None } } else { None } } else { None } }🤖 Prompt for AI Agents