Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signed off by: Prashant prabhakar <[email protected]> #1636

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions conmon-rs/server/src/container_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,52 @@ pub struct ContainerLog {
#[derive(Debug)]
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
JsonLogger(JsonLogger)
}


#[derive(Debug)]
struct JsonLogger {
type @0 :Type;
path @1 :Text;
max_size @2 :UInt64;
}




impl ContainerLog {
/// Create a new default SharedContainerLog.
pub fn new() -> SharedContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.flat_map(|x| -> Result<_> {
Ok(match x.get_type()? {
Type::ContainerRuntimeInterface => {
LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)
}
})
/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.flat_map(|x| -> Result<_> {
Ok(match x.get_type()? {
Type::ContainerRuntimeInterface => {
LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)
}
Type::JsonLogger => {
LogDriver::JsonLogger(JsonLogger::new(x.get_path()?)?)
}
})
.collect();
Ok(Arc::new(RwLock::new(Self { drivers })))
}
})
.collect();
Ok(Arc::new(RwLock::new(Self { drivers })))
}



/// Asynchronously initialize all loggers.
pub async fn init(&mut self) -> Result<()> {
Expand All @@ -62,6 +79,26 @@ impl ContainerLog {
Ok(())
}

// New method to initialize JSON loggers
pub async fn init_jsonloggers(&mut self) -> Result<()> {
join_all(
self.drivers
.iter_mut()
.filter_map(|x| {
if let LogDriver::JsonLogger(json_logger) = x {
Some(json_logger.init())
} else {
None
}
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}

/// Reopen the container logs.
pub async fn reopen(&mut self) -> Result<()> {
join_all(
Expand All @@ -78,6 +115,27 @@ impl ContainerLog {
Ok(())
}


// New method to reopen JSON loggers
pub async fn reopen_jsonloggers(&mut self) -> Result<()> {
join_all(
self.drivers
.iter_mut()
.filter_map(|x| {
if let LogDriver::JsonLogger(json_logger) = x {
Some(json_logger.reopen())
} else {
None
}
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}

/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
Expand All @@ -98,4 +156,28 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}

// New method to write JSON logs
pub async fn write_jsonlogs<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin + Copy,
{
join_all(
self.drivers
.iter_mut()
.filter_map(|x| {
if let LogDriver::JsonLogger(json_logger) = x {
Some(json_logger.write(pipe, bytes))
} else {
None
}
})
.collect::<Vec<_>>(),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}

129 changes: 129 additions & 0 deletions conmon-rs/server/src/json_logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use crate::container_io::Pipe;
use anyhow::{Context, Result};
use getset::{CopyGetters, Getters, Setters};
use memchr::memchr;
use std::{
marker::Unpin,
path::{Path, PathBuf},
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
};
use tracing::{debug, trace};
use tz::{DateTime, TimeZone};

// File logger implementation.
#[derive(Debug, CopyGetters, Getters, Setters)]
// The main structure used for container log handling.
pub struct JsonLogger {
#[getset(get)]
// Path to the file on disk.
path: PathBuf,

#[getset(set)]
// Open file handle of the `path`.
file: Option<BufWriter<File>>,

#[getset(get_copy)]
// Maximum allowed log size in bytes.
max_log_size: Option<usize>,

#[getset(get_copy, set)]
// Current bytes written to the log file.
bytes_written: usize,
}

impl JsonLogger {
const ERR_UNINITIALIZED: &'static str = "logger not initialized";

// Create a new file logger instance.
pub fn new<T: AsRef<Path>>(path: T, max_log_size: Option<usize>) -> Result<JsonLogger> {
Ok(Self {
path: path.as_ref().into(),
file: None,
max_log_size,
bytes_written: 0,
})
}

// Asynchronously initialize the CRI logger.
pub async fn init(&mut self) -> Result<()> {
debug!("Initializing JSON logger in path {}", self.path().display());
self.set_file(Self::open(self.path()).await?.into());
Ok(())
}

//Reopen the container log file.
pub async fn reopen(&mut self) -> Result<()> {
debug!("Reopen container log {}", self.path().display());
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.get_ref()
.sync_all()
.await?;
self.init().await
}

// Ensures that all content is written to disk.
pub async fn flush(&mut self) -> Result<()> {
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.flush()
.await
.context("flush file writer")
}


// Write the contents of the provided reader into the log file.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin + Copy,
{
let mut buf = Vec::new();
let mut partial = false;
let mut read = 0;
while !partial {
let (r, p) = Self::read_line(&mut buf, bytes).await?;
read += r;
partial = p;
}
self.bytes_written += read;
if let Some(max_log_size) = self.max_log_size {
if self.bytes_written > max_log_size {
self.reopen().await?;
}
}
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.write_all(&buf)
.await
.context("write to file")
}

async fn read_line<T>(r: &mut BufReader<T>, buf: &mut Vec<u8>) -> Result<(usize, bool)>
where
T: AsyncBufRead + Unpin,
{
let (partial, read) = {
let available = r.fill_buf().await?;
match memchr(b'\n', available) {
Some(i) => {
buf.extend_from_slice(&available[..=i]);
(false, i + 1)
}
None => {
buf.extend_from_slice(available);
(true, available.len())
}
}
};
r.consume(read);
Ok((read, partial))
}
}