From 35fb7e766584b063bb3e862e9a8a00631d865540 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 15 Oct 2024 14:43:15 +0200 Subject: [PATCH] Fix --- .../buffer/envelope_stack/file_backed.rs | 2 +- .../buffer/envelope_store/file_backed.rs | 116 +++++++++++------- 2 files changed, 73 insertions(+), 45 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_stack/file_backed.rs b/relay-server/src/services/buffer/envelope_stack/file_backed.rs index 112448e4ec..8ca556158f 100644 --- a/relay-server/src/services/buffer/envelope_stack/file_backed.rs +++ b/relay-server/src/services/buffer/envelope_stack/file_backed.rs @@ -488,6 +488,7 @@ mod tests { .unwrap(); file.set_len(0).await.unwrap(); // Clear the file file.write_all(b"malformed data").await.unwrap(); + file.flush().await.unwrap(); } // Attempt to pop from the stack with malformed data @@ -542,7 +543,6 @@ mod tests { } } - // Add this new test #[tokio::test] async fn test_empty_file_removal() { let envelope_store = setup_envelope_store(10).await; diff --git a/relay-server/src/services/buffer/envelope_store/file_backed.rs b/relay-server/src/services/buffer/envelope_store/file_backed.rs index 21002ef29b..34e666e5ee 100644 --- a/relay-server/src/services/buffer/envelope_store/file_backed.rs +++ b/relay-server/src/services/buffer/envelope_store/file_backed.rs @@ -8,12 +8,36 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::fs::{read_dir, remove_file, DirBuilder, File, OpenOptions}; +use tokio::fs::{create_dir_all, read_dir, remove_file, File, OpenOptions}; use tokio::io; use tokio::time::sleep; /// File extension for the files that are written by Relay and used for spooling purposes. const FILE_EXTENSION: &str = "spool"; +/// The delimiter used in the file name for the envelopes file for a given project key pair. +const PROJECT_KEYS_DELIMITER: &str = "_"; + +/// Generates a file name for the given project key pair. +fn get_envelopes_file_file_name(project_key_pair: &ProjectKeyPair) -> String { + format!( + "{}{}{}.{}", + project_key_pair.own_key, + PROJECT_KEYS_DELIMITER, + project_key_pair.sampling_key, + FILE_EXTENSION + ) +} + +/// Parses a filename of the envelopes file and returns the project key pair if valid. +fn parse_envelopes_file_file_name(file_name: &str) -> Option { + let (stem, _) = file_name.strip_suffix(FILE_EXTENSION)?.split_once(".")?; + let (own_key, sampling_key) = stem.split_once(PROJECT_KEYS_DELIMITER)?; + + Some(ProjectKeyPair { + own_key: ProjectKey::parse(own_key).ok()?, + sampling_key: ProjectKey::parse(sampling_key).ok()?, + }) +} /// An error returned when doing an operation on [`FileBackedEnvelopeStore`]. #[derive(Debug, thiserror::Error)] @@ -152,11 +176,15 @@ impl FileBackedEnvelopeStore { /// /// This method initializes the envelope store with the provided configuration, /// setting up the base path for envelope files and initializing the folder size tracker. + /// It also creates the spool directory if it doesn't exist. pub async fn new(config: &Config) -> Result { let Some(base_path) = config.spool_envelopes_path() else { return Err(FileBackedEnvelopeStoreError::NoFilePath); }; + // Create the spool directory if it doesn't exist + create_dir_all(&base_path).await?; + let folder_size_tracker = FolderSizeTracker::prepare( base_path.clone(), config.spool_disk_usage_refresh_frequency_ms(), @@ -196,14 +224,9 @@ impl FileBackedEnvelopeStore { while let Some(entry) = dir.next_entry().await? { let path = entry.path(); - if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some(FILE_EXTENSION) { - if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) { - if let Some((own_key, sampling_key)) = file_name.split_once('-') { - let project_key_pair = ProjectKeyPair { - own_key: ProjectKey::parse(own_key)?, - sampling_key: ProjectKey::parse(sampling_key)?, - }; - + if path.is_file() { + if let Some(file_name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(project_key_pair) = parse_envelopes_file_file_name(file_name) { let file = self.get_envelopes_file(project_key_pair).await?; let total_count = get_total_count(file).await.unwrap_or(0); @@ -245,6 +268,7 @@ impl EnvelopesFilesCache { } } + /// Returns the [`File`] from the cache, otherwise it will be loaded from the file system. async fn get_file( &mut self, project_key_pair: ProjectKeyPair, @@ -262,6 +286,7 @@ impl EnvelopesFilesCache { .expect("file to be in the cache"); cache_entry.last_access = Instant::now(); + Ok(&mut cache_entry.file) } @@ -275,11 +300,11 @@ impl EnvelopesFilesCache { self.cache.remove(project_key_pair); // Construct the file path - let filename = Self::filename(project_key_pair); - let filepath = base_path.join(filename); + let file_name = get_envelopes_file_file_name(project_key_pair); + let file_path = base_path.join(file_name); // Remove the file from disk - match remove_file(&filepath).await { + match remove_file(&file_path).await { Ok(_) => Ok(()), Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), Err(e) => Err(FileBackedEnvelopeStoreError::FileError(e)), @@ -291,35 +316,19 @@ impl EnvelopesFilesCache { base_path: PathBuf, project_key_pair: &ProjectKeyPair, ) -> Result { - let filename = Self::filename(project_key_pair); - - let filepath = base_path.join(filename); - Self::create_spool_directory(&filepath).await?; + let file_name = get_envelopes_file_file_name(project_key_pair); + let file_path = base_path.join(file_name); OpenOptions::new() .read(true) .write(true) .create(true) .append(true) - .open(filepath) + .open(file_path) .await .map_err(FileBackedEnvelopeStoreError::FileError) } - /// Creates the directory structure for the spool file if it doesn't exist. - async fn create_spool_directory(path: &Path) -> Result<(), FileBackedEnvelopeStoreError> { - let Some(parent) = path.parent() else { - return Ok(()); - }; - - if !parent.as_os_str().is_empty() && !parent.exists() { - relay_log::debug!("creating directory for spooling file: {}", parent.display()); - DirBuilder::new().recursive(true).create(&parent).await?; - } - - Ok(()) - } - /// Inserts a new file into the cache, evicting the least recently used entry if necessary. fn insert_into_cache(&mut self, key_pair: ProjectKeyPair, file: File) { if self.cache.len() >= self.max_opened_files { @@ -346,14 +355,6 @@ impl EnvelopesFilesCache { self.cache.remove(&lru_project_key_pair); } } - - /// Generates a filename for the given project key pair. - fn filename(project_key_pair: &ProjectKeyPair) -> String { - format!( - "{}-{}.{}", - project_key_pair.own_key, project_key_pair.sampling_key, FILE_EXTENSION - ) - } } #[cfg(test)] @@ -451,10 +452,16 @@ mod tests { } // List project key pairs with counts - let key_pairs_with_counts = store.project_key_pairs_with_counts().await.unwrap(); - assert_eq!(key_pairs_with_counts.len(), 2); - assert_eq!(key_pairs_with_counts.get(&project_key_pair1), Some(&2)); - assert_eq!(key_pairs_with_counts.get(&project_key_pair2), Some(&3)); + let project_key_pairs_with_counts = store.project_key_pairs_with_counts().await.unwrap(); + assert_eq!(project_key_pairs_with_counts.len(), 2); + assert_eq!( + project_key_pairs_with_counts.get(&project_key_pair1), + Some(&2) + ); + assert_eq!( + project_key_pairs_with_counts.get(&project_key_pair2), + Some(&3) + ); } #[tokio::test] @@ -498,7 +505,7 @@ mod tests { assert!(store.files_cache.cache.contains_key(&project_key_pair)); let file_path = store .base_path - .join(EnvelopesFilesCache::filename(&project_key_pair)); + .join(get_envelopes_file_file_name(&project_key_pair)); assert!(file_path.exists()); // Remove the file @@ -537,4 +544,25 @@ mod tests { // The total size should be 1000 + 2000 + 3000 = 6000 bytes assert_eq!(folder_size, 6000); } + + #[test] + fn test_parse_filename() { + let valid_filename = + "a94ae32be2584e0bbd7a4cbb95971fee_b94ae32be2584e0bbd7a4cbb95971fee.spool"; + let invalid_filename = "invalid_filename.txt"; + + assert!(parse_envelopes_file_file_name(valid_filename).is_some()); + assert!(parse_envelopes_file_file_name(invalid_filename).is_none()); + + if let Some(project_key_pair) = parse_envelopes_file_file_name(valid_filename) { + assert_eq!( + project_key_pair.own_key.to_string(), + "a94ae32be2584e0bbd7a4cbb95971fee" + ); + assert_eq!( + project_key_pair.sampling_key.to_string(), + "b94ae32be2584e0bbd7a4cbb95971fee" + ); + } + } }