Skip to content

Commit

Permalink
Cleanup connection handler, add session logs (#1216)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 4, 2024
1 parent f6fff5a commit 63b18c2
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.30"
version = "0.4.31"
edition = "2021"
build = "src/build.rs"

Expand Down
12 changes: 5 additions & 7 deletions server/src/quic/listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::net::SocketAddr;
use std::sync::Arc;

use crate::binary::command;
use crate::command::ServerCommand;
use crate::quic::quic_sender::QuicSender;
Expand All @@ -13,6 +10,7 @@ use bytes::Bytes;
use iggy::validatable::Validatable;
use iggy::{bytes_serializable::BytesSerializable, messages::MAX_PAYLOAD_SIZE};
use quinn::{Connection, Endpoint, RecvStream, SendStream};
use std::sync::Arc;
use tracing::{debug, error, info};

const LISTENERS_COUNT: u32 = 10;
Expand Down Expand Up @@ -62,7 +60,7 @@ async fn handle_connection(
.await;
let session = Arc::new(Session::from_client_id(client_id, address));

while let Some(stream) = accept_stream(&connection, &system, &address).await? {
while let Some(stream) = accept_stream(&connection, &system, client_id).await? {
let system = system.clone();
let session = session.clone();

Expand All @@ -81,17 +79,17 @@ type BiStream = (SendStream, RecvStream);
async fn accept_stream(
connection: &Connection,
system: &SharedSystem,
address: &SocketAddr,
client_id: u32,
) -> Result<Option<BiStream>, ServerError> {
match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
info!("Connection closed");
system.read().await.delete_client(address).await;
system.read().await.delete_client(client_id).await;
Ok(None)
}
Err(error) => {
error!("Error when handling QUIC stream: {:?}", error);
system.read().await.delete_client(address).await;
system.read().await.delete_client(client_id).await;
Err(error.into())
}
Ok(stream) => Ok(Some(stream)),
Expand Down
15 changes: 3 additions & 12 deletions server/src/streaming/clients/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,7 @@ impl ClientManager {
Ok(())
}

pub fn get_client_by_address(
&self,
address: &SocketAddr,
) -> Result<IggySharedMut<Client>, IggyError> {
let id = hash::calculate_32(address.to_string().as_bytes());
self.get_client_by_id(id)
}

pub fn get_client_by_id(&self, client_id: u32) -> Result<IggySharedMut<Client>, IggyError> {
pub fn get_client(&self, client_id: u32) -> Result<IggySharedMut<Client>, IggyError> {
let client = self.clients.get(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
Expand Down Expand Up @@ -119,9 +111,8 @@ impl ClientManager {
Ok(())
}

pub fn delete_client(&mut self, address: &SocketAddr) -> Option<IggySharedMut<Client>> {
let id = hash::calculate_32(address.to_string().as_bytes());
self.clients.remove(&id)
pub fn delete_client(&mut self, client_id: u32) -> Option<IggySharedMut<Client>> {
self.clients.remove(&client_id)
}

pub async fn join_consumer_group(
Expand Down
53 changes: 17 additions & 36 deletions server/src/streaming/systems/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,41 @@ impl System {
client_id
}

pub async fn delete_client(&self, address: &SocketAddr) {
pub async fn delete_client(&self, client_id: u32) {
let consumer_groups: Vec<(u32, u32, u32)>;
let client_id;

{
let client_manager = self.client_manager.read().await;
let client = client_manager.get_client_by_address(address);
if client.is_err() {
let mut client_manager = self.client_manager.write().await;
let client = client_manager.delete_client(client_id);
if client.is_none() {
error!("Client with ID: {client_id} was not found in the client manager.",);
return;
}

self.metrics.decrement_clients(1);
let client = client.unwrap();
let client = client.read().await;
client_id = client.client_id;

consumer_groups = client
.consumer_groups
.iter()
.map(|c| (c.stream_id, c.topic_id, c.group_id))
.collect();

info!(
"Deleted {} client with ID: {} for IP address: {}",
client.transport, client.client_id, client.address
);
}

for (stream_id, topic_id, consumer_group_id) in consumer_groups.iter() {
if let Err(error) = self
for (stream_id, topic_id, consumer_group_id) in consumer_groups.into_iter() {
_ = self
.leave_consumer_group_by_client(
&Identifier::numeric(*stream_id).unwrap(),
&Identifier::numeric(*topic_id).unwrap(),
&Identifier::numeric(*consumer_group_id).unwrap(),
&Identifier::numeric(stream_id).unwrap(),
&Identifier::numeric(topic_id).unwrap(),
&Identifier::numeric(consumer_group_id).unwrap(),
client_id,
)
.await
{
error!(
"Failed to leave consumer group with ID: {} by client with ID: {}. Error: {}",
consumer_group_id, client_id, error
);
}
}

{
let mut client_manager = self.client_manager.write().await;
let client = client_manager.delete_client(address);
if client.is_none() {
return;
}

self.metrics.decrement_clients(1);
let client = client.unwrap();
let client = client.read().await;

info!(
"Deleted {} client with ID: {} for IP address: {}",
client.transport, client.client_id, client.address
);
}
}

Expand All @@ -82,7 +63,7 @@ impl System {
self.ensure_authenticated(session)?;
self.permissioner.get_client(session.get_user_id())?;
let client_manager = self.client_manager.read().await;
client_manager.get_client_by_id(client_id)
client_manager.get_client(client_id)
}

pub async fn get_clients(
Expand Down
11 changes: 1 addition & 10 deletions server/src/tcp/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,22 @@ use crate::binary::command;
use crate::binary::sender::Sender;
use crate::command::ServerCommand;
use crate::server_error::ServerError;
use crate::streaming::clients::client_manager::Transport;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use bytes::{BufMut, BytesMut};
use iggy::bytes_serializable::BytesSerializable;
use iggy::error::IggyError;
use iggy::validatable::Validatable;
use std::io::ErrorKind;
use std::net::SocketAddr;
use tracing::{debug, error, info};

const INITIAL_BYTES_LENGTH: usize = 4;

pub(crate) async fn handle_connection(
address: SocketAddr,
session: Session,
sender: &mut dyn Sender,
system: SharedSystem,
) -> Result<(), ServerError> {
let client_id = system
.read()
.await
.add_client(&address, Transport::Tcp)
.await;

let session = Session::from_client_id(client_id, address);
let mut initial_buffer = [0u8; INITIAL_BYTES_LENGTH];
loop {
let read_length = match sender.read(&mut initial_buffer).await {
Expand Down
18 changes: 14 additions & 4 deletions server/src/tcp/tcp_listener.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::streaming::clients::client_manager::Transport;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_sender::TcpSender;
Expand Down Expand Up @@ -28,19 +30,27 @@ pub async fn start(address: &str, system: SharedSystem) -> SocketAddr {
loop {
match listener.accept().await {
Ok((stream, address)) => {
info!("Accepted new TCP connection: {}", address);
info!("Accepted new TCP connection: {address}");
let client_id = system
.read()
.await
.add_client(&address, Transport::Tcp)
.await;

let session = Session::from_client_id(client_id, address);
info!("Created new session: {session}");
let system = system.clone();
let mut sender = TcpSender { stream };
tokio::spawn(async move {
if let Err(error) =
handle_connection(address, &mut sender, system.clone()).await
handle_connection(session, &mut sender, system.clone()).await
{
handle_error(error);
system.read().await.delete_client(&address).await;
system.read().await.delete_client(client_id).await;
}
});
}
Err(error) => error!("Unable to accept TCP socket, error: {}", error),
Err(error) => error!("Unable to accept TCP socket, error: {error}"),
}
}
});
Expand Down
17 changes: 14 additions & 3 deletions server/src/tcp/tcp_tls_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;

use crate::configs::tcp::TcpTlsConfig;
use crate::streaming::clients::client_manager::Transport;
use crate::streaming::session::Session;
use crate::streaming::systems::system::SharedSystem;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_tls_sender::TcpTlsSender;
Expand Down Expand Up @@ -49,20 +51,29 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys
match listener.accept().await {
Ok((stream, address)) => {
info!("Accepted new TCP TLS connection: {}", address);
let client_id = system
.read()
.await
.add_client(&address, Transport::Tcp)
.await;

let session = Session::from_client_id(client_id, address);
info!("Created new session: {session}");

let acceptor = acceptor.clone();
let stream = acceptor.accept(stream).await.unwrap();
let system = system.clone();
let mut sender = TcpTlsSender { stream };
tokio::spawn(async move {
if let Err(error) =
handle_connection(address, &mut sender, system.clone()).await
handle_connection(session, &mut sender, system.clone()).await
{
handle_error(error);
system.read().await.delete_client(&address).await;
system.read().await.delete_client(client_id).await;
}
});
}
Err(error) => error!("Unable to accept TCP TLS socket, error: {}", error),
Err(error) => error!("Unable to accept TCP TLS socket, error: {error}"),
}
}
});
Expand Down

0 comments on commit 63b18c2

Please sign in to comment.