Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Oct 15, 2024
1 parent 4699547 commit 35fb7e7
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ mod tests {
.unwrap();
file.set_len(0).await.unwrap(); // Clear the file
file.write_all(b"malformed data").await.unwrap();
file.flush().await.unwrap();
}

// Attempt to pop from the stack with malformed data
Expand Down Expand Up @@ -542,7 +543,6 @@ mod tests {
}
}

// Add this new test
#[tokio::test]
async fn test_empty_file_removal() {
let envelope_store = setup_envelope_store(10).await;
Expand Down
116 changes: 72 additions & 44 deletions relay-server/src/services/buffer/envelope_store/file_backed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,36 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::{read_dir, remove_file, DirBuilder, File, OpenOptions};
use tokio::fs::{create_dir_all, read_dir, remove_file, File, OpenOptions};
use tokio::io;
use tokio::time::sleep;

/// File extension for the files that are written by Relay and used for spooling purposes.
const FILE_EXTENSION: &str = "spool";
/// The delimiter used in the file name for the envelopes file for a given project key pair.
const PROJECT_KEYS_DELIMITER: &str = "_";

/// Generates a file name for the given project key pair.
fn get_envelopes_file_file_name(project_key_pair: &ProjectKeyPair) -> String {
format!(
"{}{}{}.{}",
project_key_pair.own_key,
PROJECT_KEYS_DELIMITER,
project_key_pair.sampling_key,
FILE_EXTENSION
)
}

/// Parses a filename of the envelopes file and returns the project key pair if valid.
fn parse_envelopes_file_file_name(file_name: &str) -> Option<ProjectKeyPair> {
let (stem, _) = file_name.strip_suffix(FILE_EXTENSION)?.split_once(".")?;
let (own_key, sampling_key) = stem.split_once(PROJECT_KEYS_DELIMITER)?;

Some(ProjectKeyPair {
own_key: ProjectKey::parse(own_key).ok()?,
sampling_key: ProjectKey::parse(sampling_key).ok()?,
})
}

/// An error returned when doing an operation on [`FileBackedEnvelopeStore`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -152,11 +176,15 @@ impl FileBackedEnvelopeStore {
///
/// This method initializes the envelope store with the provided configuration,
/// setting up the base path for envelope files and initializing the folder size tracker.
/// It also creates the spool directory if it doesn't exist.
pub async fn new(config: &Config) -> Result<Self, FileBackedEnvelopeStoreError> {
let Some(base_path) = config.spool_envelopes_path() else {
return Err(FileBackedEnvelopeStoreError::NoFilePath);
};

// Create the spool directory if it doesn't exist
create_dir_all(&base_path).await?;

let folder_size_tracker = FolderSizeTracker::prepare(
base_path.clone(),
config.spool_disk_usage_refresh_frequency_ms(),
Expand Down Expand Up @@ -196,14 +224,9 @@ impl FileBackedEnvelopeStore {
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();

if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some(FILE_EXTENSION) {
if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) {
if let Some((own_key, sampling_key)) = file_name.split_once('-') {
let project_key_pair = ProjectKeyPair {
own_key: ProjectKey::parse(own_key)?,
sampling_key: ProjectKey::parse(sampling_key)?,
};

if path.is_file() {
if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(project_key_pair) = parse_envelopes_file_file_name(file_name) {
let file = self.get_envelopes_file(project_key_pair).await?;
let total_count = get_total_count(file).await.unwrap_or(0);

Expand Down Expand Up @@ -245,6 +268,7 @@ impl EnvelopesFilesCache {
}
}

/// Returns the [`File`] from the cache, otherwise it will be loaded from the file system.
async fn get_file(
&mut self,
project_key_pair: ProjectKeyPair,
Expand All @@ -262,6 +286,7 @@ impl EnvelopesFilesCache {
.expect("file to be in the cache");

cache_entry.last_access = Instant::now();

Ok(&mut cache_entry.file)
}

Expand All @@ -275,11 +300,11 @@ impl EnvelopesFilesCache {
self.cache.remove(project_key_pair);

// Construct the file path
let filename = Self::filename(project_key_pair);
let filepath = base_path.join(filename);
let file_name = get_envelopes_file_file_name(project_key_pair);
let file_path = base_path.join(file_name);

// Remove the file from disk
match remove_file(&filepath).await {
match remove_file(&file_path).await {
Ok(_) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FileBackedEnvelopeStoreError::FileError(e)),
Expand All @@ -291,35 +316,19 @@ impl EnvelopesFilesCache {
base_path: PathBuf,
project_key_pair: &ProjectKeyPair,
) -> Result<File, FileBackedEnvelopeStoreError> {
let filename = Self::filename(project_key_pair);

let filepath = base_path.join(filename);
Self::create_spool_directory(&filepath).await?;
let file_name = get_envelopes_file_file_name(project_key_pair);
let file_path = base_path.join(file_name);

OpenOptions::new()
.read(true)
.write(true)
.create(true)
.append(true)
.open(filepath)
.open(file_path)
.await
.map_err(FileBackedEnvelopeStoreError::FileError)
}

/// Creates the directory structure for the spool file if it doesn't exist.
async fn create_spool_directory(path: &Path) -> Result<(), FileBackedEnvelopeStoreError> {
let Some(parent) = path.parent() else {
return Ok(());
};

if !parent.as_os_str().is_empty() && !parent.exists() {
relay_log::debug!("creating directory for spooling file: {}", parent.display());
DirBuilder::new().recursive(true).create(&parent).await?;
}

Ok(())
}

/// Inserts a new file into the cache, evicting the least recently used entry if necessary.
fn insert_into_cache(&mut self, key_pair: ProjectKeyPair, file: File) {
if self.cache.len() >= self.max_opened_files {
Expand All @@ -346,14 +355,6 @@ impl EnvelopesFilesCache {
self.cache.remove(&lru_project_key_pair);
}
}

/// Generates a filename for the given project key pair.
fn filename(project_key_pair: &ProjectKeyPair) -> String {
format!(
"{}-{}.{}",
project_key_pair.own_key, project_key_pair.sampling_key, FILE_EXTENSION
)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -451,10 +452,16 @@ mod tests {
}

// List project key pairs with counts
let key_pairs_with_counts = store.project_key_pairs_with_counts().await.unwrap();
assert_eq!(key_pairs_with_counts.len(), 2);
assert_eq!(key_pairs_with_counts.get(&project_key_pair1), Some(&2));
assert_eq!(key_pairs_with_counts.get(&project_key_pair2), Some(&3));
let project_key_pairs_with_counts = store.project_key_pairs_with_counts().await.unwrap();
assert_eq!(project_key_pairs_with_counts.len(), 2);
assert_eq!(
project_key_pairs_with_counts.get(&project_key_pair1),
Some(&2)
);
assert_eq!(
project_key_pairs_with_counts.get(&project_key_pair2),
Some(&3)
);
}

#[tokio::test]
Expand Down Expand Up @@ -498,7 +505,7 @@ mod tests {
assert!(store.files_cache.cache.contains_key(&project_key_pair));
let file_path = store
.base_path
.join(EnvelopesFilesCache::filename(&project_key_pair));
.join(get_envelopes_file_file_name(&project_key_pair));
assert!(file_path.exists());

// Remove the file
Expand Down Expand Up @@ -537,4 +544,25 @@ mod tests {
// The total size should be 1000 + 2000 + 3000 = 6000 bytes
assert_eq!(folder_size, 6000);
}

#[test]
fn test_parse_filename() {
let valid_filename =
"a94ae32be2584e0bbd7a4cbb95971fee_b94ae32be2584e0bbd7a4cbb95971fee.spool";
let invalid_filename = "invalid_filename.txt";

assert!(parse_envelopes_file_file_name(valid_filename).is_some());
assert!(parse_envelopes_file_file_name(invalid_filename).is_none());

if let Some(project_key_pair) = parse_envelopes_file_file_name(valid_filename) {
assert_eq!(
project_key_pair.own_key.to_string(),
"a94ae32be2584e0bbd7a4cbb95971fee"
);
assert_eq!(
project_key_pair.sampling_key.to_string(),
"b94ae32be2584e0bbd7a4cbb95971fee"
);
}
}
}

0 comments on commit 35fb7e7

Please sign in to comment.