diff --git a/conmon-rs/server/src/container_log.rs b/conmon-rs/server/src/container_log.rs index fff7f7ce20..0bd68865f2 100644 --- a/conmon-rs/server/src/container_log.rs +++ b/conmon-rs/server/src/container_log.rs @@ -16,35 +16,52 @@ pub struct ContainerLog { #[derive(Debug)] enum LogDriver { ContainerRuntimeInterface(CriLogger), + JsonLogger(JsonLogger) } + +#[derive(Debug)] +struct JsonLogger { + type @0 :Type; + path @1 :Text; + max_size @2 :UInt64; + } + + + + impl ContainerLog { /// Create a new default SharedContainerLog. pub fn new() -> SharedContainerLog { Arc::new(RwLock::new(Self::default())) } - /// Create a new SharedContainerLog from an capnp owned reader. - pub fn from(reader: Reader) -> Result { - let drivers = reader - .iter() - .flat_map(|x| -> Result<_> { - Ok(match x.get_type()? { - Type::ContainerRuntimeInterface => { - LogDriver::ContainerRuntimeInterface(CriLogger::new( - x.get_path()?, - if x.get_max_size() > 0 { - Some(x.get_max_size() as usize) - } else { - None - }, - )?) - } - }) + /// Create a new SharedContainerLog from an capnp owned reader. + pub fn from(reader: Reader) -> Result { + let drivers = reader + .iter() + .flat_map(|x| -> Result<_> { + Ok(match x.get_type()? { + Type::ContainerRuntimeInterface => { + LogDriver::ContainerRuntimeInterface(CriLogger::new( + x.get_path()?, + if x.get_max_size() > 0 { + Some(x.get_max_size() as usize) + } else { + None + }, + )?) + } + Type::JsonLogger => { + LogDriver::JsonLogger(JsonLogger::new(x.get_path()?)?) + } }) - .collect(); - Ok(Arc::new(RwLock::new(Self { drivers }))) - } + }) + .collect(); + Ok(Arc::new(RwLock::new(Self { drivers }))) +} + + /// Asynchronously initialize all loggers. pub async fn init(&mut self) -> Result<()> { @@ -62,6 +79,26 @@ impl ContainerLog { Ok(()) } + // New method to initialize JSON loggers + pub async fn init_jsonloggers(&mut self) -> Result<()> { + join_all( + self.drivers + .iter_mut() + .filter_map(|x| { + if let LogDriver::JsonLogger(json_logger) = x { + Some(json_logger.init()) + } else { + None + } + }) + .collect::>(), + ) + .await + .into_iter() + .collect::>>()?; + Ok(()) + } + /// Reopen the container logs. pub async fn reopen(&mut self) -> Result<()> { join_all( @@ -78,6 +115,27 @@ impl ContainerLog { Ok(()) } + + // New method to reopen JSON loggers + pub async fn reopen_jsonloggers(&mut self) -> Result<()> { + join_all( + self.drivers + .iter_mut() + .filter_map(|x| { + if let LogDriver::JsonLogger(json_logger) = x { + Some(json_logger.reopen()) + } else { + None + } + }) + .collect::>(), + ) + .await + .into_iter() + .collect::>>()?; + Ok(()) + } + /// Write the contents of the provided reader into all loggers. pub async fn write(&mut self, pipe: Pipe, bytes: T) -> Result<()> where @@ -98,4 +156,28 @@ impl ContainerLog { .collect::>>()?; Ok(()) } + + // New method to write JSON logs + pub async fn write_jsonlogs(&mut self, pipe: Pipe, bytes: T) -> Result<()> + where + T: AsyncBufRead + Unpin + Copy, + { + join_all( + self.drivers + .iter_mut() + .filter_map(|x| { + if let LogDriver::JsonLogger(json_logger) = x { + Some(json_logger.write(pipe, bytes)) + } else { + None + } + }) + .collect::>(), + ) + .await + .into_iter() + .collect::>>()?; + Ok(()) + } } + diff --git a/conmon-rs/server/src/json_logger.rs b/conmon-rs/server/src/json_logger.rs new file mode 100644 index 0000000000..7ddc60b485 --- /dev/null +++ b/conmon-rs/server/src/json_logger.rs @@ -0,0 +1,129 @@ +use crate::container_io::Pipe; +use anyhow::{Context, Result}; +use getset::{CopyGetters, Getters, Setters}; +use memchr::memchr; +use std::{ + marker::Unpin, + path::{Path, PathBuf}, +}; +use tokio::{ + fs::{File, OpenOptions}, + io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, +}; +use tracing::{debug, trace}; +use tz::{DateTime, TimeZone}; + +// File logger implementation. +#[derive(Debug, CopyGetters, Getters, Setters)] +// The main structure used for container log handling. +pub struct JsonLogger { + #[getset(get)] + // Path to the file on disk. + path: PathBuf, + + #[getset(set)] + // Open file handle of the `path`. + file: Option>, + + #[getset(get_copy)] + // Maximum allowed log size in bytes. + max_log_size: Option, + + #[getset(get_copy, set)] + // Current bytes written to the log file. + bytes_written: usize, +} + +impl JsonLogger { + const ERR_UNINITIALIZED: &'static str = "logger not initialized"; + + // Create a new file logger instance. + pub fn new>(path: T, max_log_size: Option) -> Result { + Ok(Self { + path: path.as_ref().into(), + file: None, + max_log_size, + bytes_written: 0, + }) + } + + // Asynchronously initialize the CRI logger. + pub async fn init(&mut self) -> Result<()> { + debug!("Initializing JSON logger in path {}", self.path().display()); + self.set_file(Self::open(self.path()).await?.into()); + Ok(()) + } + + //Reopen the container log file. + pub async fn reopen(&mut self) -> Result<()> { + debug!("Reopen container log {}", self.path().display()); + self.file + .as_mut() + .context(Self::ERR_UNINITIALIZED)? + .get_ref() + .sync_all() + .await?; + self.init().await + } + + // Ensures that all content is written to disk. + pub async fn flush(&mut self) -> Result<()> { + self.file + .as_mut() + .context(Self::ERR_UNINITIALIZED)? + .flush() + .await + .context("flush file writer") + } + + + // Write the contents of the provided reader into the log file. + pub async fn write(&mut self, pipe: Pipe, bytes: T) -> Result<()> + where + T: AsyncBufRead + Unpin + Copy, + { + let mut buf = Vec::new(); + let mut partial = false; + let mut read = 0; + while !partial { + let (r, p) = Self::read_line(&mut buf, bytes).await?; + read += r; + partial = p; + } + self.bytes_written += read; + if let Some(max_log_size) = self.max_log_size { + if self.bytes_written > max_log_size { + self.reopen().await?; + } + } + self.file + .as_mut() + .context(Self::ERR_UNINITIALIZED)? + .write_all(&buf) + .await + .context("write to file") + } + + async fn read_line(r: &mut BufReader, buf: &mut Vec) -> Result<(usize, bool)> + where + T: AsyncBufRead + Unpin, + { + let (partial, read) = { + let available = r.fill_buf().await?; + match memchr(b'\n', available) { + Some(i) => { + buf.extend_from_slice(&available[..=i]); + (false, i + 1) + } + None => { + buf.extend_from_slice(available); + (true, available.len()) + } + } + }; + r.consume(read); + Ok((read, partial)) + } +} + +