Skip to content

Commit

Permalink
first review
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Oct 15, 2024
1 parent cfabee3 commit ce9fd0e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 126 deletions.
163 changes: 57 additions & 106 deletions relay-server/src/services/buffer/envelope_stack/file_backed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::file_backed::{
FileBackedEnvelopeStore, FileBackedEnvelopeStoreError,
};
use crate::statsd::RelayTimers;
use std::io;
use std::io::SeekFrom;
use std::sync::Arc;
Expand Down Expand Up @@ -68,55 +67,22 @@ impl FileBackedEnvelopeStack {
envelope_store,
}
}

/// Reads and removes the last envelope from the file.
///
/// If the file is empty when trying to read the last envelope, the file will be deleted and
/// `None` is returned.
///
/// If the file is corrupted or incomplete, it will be truncated and an error will be returned.
async fn read_and_remove_last_envelope(
&mut self,
) -> Result<Option<Box<Envelope>>, FileBackedEnvelopeStackError> {
let mut envelope_store = self.envelope_store.lock().await;
let file = envelope_store
.get_envelopes_file(self.project_key_pair)
.await?;

let envelope = read_and_remove_last_envelope(file).await?;
// In case we didn't get any envelope back, we will remove the file.
if envelope.is_none() {
envelope_store.remove_file(&self.project_key_pair).await?;
}

Ok(envelope)
}

/// Appends an envelope to the file.
///
/// The envelope is serialized and written to the end of the file along with its size,
/// total count, and version.
async fn append_envelope(
&mut self,
envelope: &Envelope,
) -> Result<(), FileBackedEnvelopeStackError> {
let mut envelope_store = self.envelope_store.lock().await;
let file = envelope_store
.get_envelopes_file(self.project_key_pair)
.await?;

append_envelope(file, envelope).await
}
}

impl EnvelopeStack for FileBackedEnvelopeStack {
type Error = FileBackedEnvelopeStackError;

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
relay_statsd::metric!(timer(RelayTimers::BufferPush), {
self.append_envelope(&envelope).await?;
});
Ok(())
{
let this = &mut *self;
let envelope: &Envelope = &envelope;
async move {
let mut envelope_store = this.envelope_store.lock().await;
let file = envelope_store.get_file(this.project_key_pair).await?;
append_envelope(file, envelope).await
}
}
.await
}

async fn peek(&mut self) -> Result<Option<&Envelope>, Self::Error> {
Expand All @@ -126,9 +92,15 @@ impl EnvelopeStack for FileBackedEnvelopeStack {
}

async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
relay_statsd::metric!(timer(RelayTimers::BufferPop), {
self.read_and_remove_last_envelope().await
})
let mut envelope_store = self.envelope_store.lock().await;
let file = envelope_store.get_file(self.project_key_pair).await?;
let envelope = pop_envelope(file).await?;

// TODO: move cleanup to LRU cache so we can keep the file alive?
if envelope.is_none() {
envelope_store.remove_file(&self.project_key_pair).await?;
}
Ok(envelope)
}

fn flush(self) -> Vec<Box<Envelope>> {
Expand All @@ -141,19 +113,16 @@ impl EnvelopeStack for FileBackedEnvelopeStack {
///
/// If the file is empty or doesn't contain a total count field, it returns 0.
pub async fn get_total_count(file: &mut File) -> Result<u32, FileBackedEnvelopeStackError> {
let file_size = file.metadata().await?.len();
if file_size < VERSION_FIELD_BYTES + TOTAL_COUNT_FIELD_BYTES {
return Ok(0);
let from_back = (VERSION_FIELD_BYTES + TOTAL_COUNT_FIELD_BYTES) as i64;
match file.seek(SeekFrom::End(-from_back)).await {
Ok(_) => Ok(file.read_u32_le().await?),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
relay_log::error!("Truncating envelope stack"); // TODO: add project key pair as tag
truncate_file(file, 0).await?;
Ok(0)
}
Err(e) => Err(e.into()),
}

let mut total_count_buf = [0u8; TOTAL_COUNT_FIELD_BYTES as usize];
file.seek(SeekFrom::End(
-((VERSION_FIELD_BYTES + TOTAL_COUNT_FIELD_BYTES) as i64),
))
.await?;
file.read_exact(&mut total_count_buf).await?;

Ok(u32::from_le_bytes(total_count_buf))
}

/// Helper method to truncate the file to a given size.
Expand All @@ -173,7 +142,7 @@ pub async fn truncate_file(file: &File, new_size: u64) -> Result<(), FileBackedE
/// If the file is empty when trying to read the last envelope `None` is returned.
///
/// If the file is corrupted or incomplete, it will be truncated and an error will be returned.
pub async fn read_and_remove_last_envelope(
pub async fn pop_envelope(
file: &mut File,
) -> Result<Option<Box<Envelope>>, FileBackedEnvelopeStackError> {
// Get the file size
Expand All @@ -195,6 +164,12 @@ pub async fn read_and_remove_last_envelope(
));
}

// TODO: metrics for read and deserialize times
// TODO: don't use vector
// TODO: make seek helper method read a generic number of bytes
// TODO: don't check metadata
// TODO:

// Skip version and total count fields, then read the envelope size
let mut envelope_size_buf = [0u8; ENVELOPE_SIZE_FIELD_BYTES as usize];
file.seek(SeekFrom::End(-(header_size as i64))).await?;
Expand All @@ -209,6 +184,10 @@ pub async fn read_and_remove_last_envelope(
));
}

// TODO: check if envelope_size is sane (e.g. < 2*envelope_size_limit).
// If not, truncate entire file.
// TODO: consider not allocating and (de)serializing directly from/to Writer

// Read the envelope data
let mut envelope_buf = vec![0; envelope_size as usize];
file.seek(SeekFrom::End(-((envelope_size + header_size) as i64)))
Expand Down Expand Up @@ -242,26 +221,22 @@ pub async fn append_envelope(
file: &mut File,
envelope: &Envelope,
) -> Result<(), FileBackedEnvelopeStackError> {
// TODO: check version here

// Get the current total count
let total_count = get_total_count(file).await?;

// Serialize envelope
// TODO: Measure serialization time.
let envelope_bytes = envelope.to_vec()?;

// Construct buffer to write
let envelope_size = envelope_bytes.len();
let mut buffer = Vec::with_capacity(
envelope_size
+ (VERSION_FIELD_BYTES + TOTAL_COUNT_FIELD_BYTES + ENVELOPE_SIZE_FIELD_BYTES) as usize,
);
buffer.extend_from_slice(&envelope_bytes);
buffer.extend_from_slice(&envelope_size.to_le_bytes());
buffer.extend_from_slice(&(total_count + 1).to_le_bytes());
buffer.push(VERSION);

// Write data
// TODO: Measure write time.
file.seek(SeekFrom::End(0)).await?;
file.write_all(&buffer).await?;
file.write_all(&envelope_bytes).await?;
file.write_u64_le(envelope_bytes.len() as u64).await?;
file.write_u32_le(total_count + 1).await?;
file.write_u8(VERSION).await?;

file.flush().await?;

Ok(())
Expand Down Expand Up @@ -353,10 +328,7 @@ mod tests {
stack.push(envelope.clone()).await.unwrap();

let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
let total_count = get_total_count(file).await.unwrap();
assert_eq!(
total_count,
Expand All @@ -370,10 +342,7 @@ mod tests {
stack.pop().await.unwrap();

let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
let total_count = get_total_count(file).await.unwrap();
assert_eq!(total_count, i as u32, "Total count mismatch after pop");
}
Expand All @@ -395,10 +364,7 @@ mod tests {

// Verify version field
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();

let mut version_buf = [0u8; VERSION_FIELD_BYTES as usize];
file.seek(SeekFrom::End(-(VERSION_FIELD_BYTES as i64)))
Expand All @@ -425,10 +391,7 @@ mod tests {

// Verify file structure
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
let file_size = file.metadata().await.unwrap().len();

// Read fields from the end of the file
Expand Down Expand Up @@ -482,10 +445,7 @@ mod tests {
// Write malformed data directly to the file
{
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
file.set_len(0).await.unwrap(); // Clear the file
file.write_all(b"malformed data").await.unwrap();
file.flush().await.unwrap();
Expand All @@ -500,10 +460,7 @@ mod tests {
// Verify the file is truncated
{
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
assert_eq!(file.metadata().await.unwrap().len(), 0);
}
}
Expand All @@ -520,10 +477,7 @@ mod tests {
// Write incomplete envelope data directly to the file
{
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
file.set_len(0).await.unwrap();
file.write_all(&[0u8; 4]).await.unwrap();
file.flush().await.unwrap();
Expand All @@ -535,10 +489,7 @@ mod tests {
// Verify the file is truncated
{
let mut store = envelope_store.lock().await;
let file = store
.get_envelopes_file(stack.project_key_pair)
.await
.unwrap();
let file = store.get_file(stack.project_key_pair).await.unwrap();
assert_eq!(file.metadata().await.unwrap().len(), 0);
}
}
Expand Down Expand Up @@ -569,7 +520,7 @@ mod tests {
// Check that the file still exists after the first pop
{
let mut store = envelope_store.lock().await;
let result = store.get_envelopes_file(stack.project_key_pair).await;
let result = store.get_file(stack.project_key_pair).await;
assert!(
result.is_ok(),
"Expected file to still exist after first pop"
Expand Down
Loading

0 comments on commit ce9fd0e

Please sign in to comment.