Skip to content

Commit

Permalink
Add tracing spans
Browse files Browse the repository at this point in the history
  • Loading branch information
ndelvalle committed Apr 20, 2024
1 parent f33c544 commit 022320c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ tokio = { version = "1.35.0", features = ['full'] }
tokio-util = { version = "0.7.10", features = ["codec"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
uuid = { version = "1.8.0", features = ["v4"] }
15 changes: 13 additions & 2 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,33 @@ use futures::stream::StreamExt; // Use the correct StreamExt trait
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use uuid::Uuid;

use std::net::SocketAddr;

use crate::codec::FrameCodec;
use crate::frame::Frame;
use crate::Result;

pub struct Connection {
pub id: Uuid,
pub client_address: SocketAddr,
pub writer: OwnedWriteHalf,
reader: FramedRead<OwnedReadHalf, FrameCodec>,
}

impl Connection {
pub fn new(stream: TcpStream) -> Connection {
pub fn new(stream: TcpStream, client_address: SocketAddr) -> Connection {
let (reader, writer) = stream.into_split();
let reader = FramedRead::new(reader, FrameCodec);
let id = Uuid::new_v4();

Connection { writer, reader }
Connection {
id,
writer,
reader,
client_address,
}
}

pub async fn read_frame(&mut self) -> Result<Option<Frame>> {
Expand Down
28 changes: 19 additions & 9 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,56 @@
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tracing::info;
use tracing::{info, instrument};

use crate::commands::executable::Executable;
use crate::commands::Command;
use crate::connection::Connection;
use crate::store::Store;
use crate::Error;

const PORT: u16 = 6379;

pub async fn run() -> Result<(), Error> {
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber)?;

let listener = TcpListener::bind("127.0.0.1:6379").await?;
let listener = TcpListener::bind(("127.0.0.1", PORT)).await?;
let store = Arc::new(Mutex::new(Store::new()));

info!("Redis server listening on {}", listener.local_addr()?);

loop {
let (socket, _) = listener.accept().await?;
let (socket, client_address) = listener.accept().await?;
let store = store.clone();
info!("Accepted connection from {:?}", socket.peer_addr()?);
info!("Accepted connection from {:?}", client_address);

tokio::spawn(async move {
if let Err(e) = handle_connection(socket, store).await {
if let Err(e) = handle_connection(socket, client_address, store).await {
println!("Error: {}", e);
}
});
}
}

async fn handle_connection(stream: TcpStream, store: Arc<Mutex<Store>>) -> Result<(), Error> {
let mut con = Connection::new(stream);
#[instrument(name = "connection", skip(stream, store), fields(connection_id))]
async fn handle_connection(
stream: TcpStream,
client_address: SocketAddr,
store: Arc<Mutex<Store>>,
) -> Result<(), Error> {
let mut conn = Connection::new(stream, client_address);

tracing::Span::current().record("connection_id", conn.id.to_string());

while let Some(frame) = con.read_frame().await? {
while let Some(frame) = conn.read_frame().await? {
info!("Received frame: {:?}", frame);
let cmd = Command::try_from(frame)?;
let res = cmd.exec(store.clone())?;
let res: Vec<u8> = res.into();

con.writer.write_all(&res).await?;
conn.writer.write_all(&res).await?;
}

Ok(())
Expand Down

0 comments on commit 022320c

Please sign in to comment.