From 958258e8fbdf3510dfebc60ed5806568002997e7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 6 Jan 2025 00:12:28 +0800 Subject: [PATCH 1/4] - **Refactored SST File Handling**: - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths. - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management. - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths. - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`. - **Enhanced Indexer Management**: - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation. - Updated `ParquetWriter` to handle multiple indexers and file IDs. - Files affected: `index.rs`, `parquet.rs`, `writer.rs`. - **Removed Redundant File ID Handling**: - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`. - Updated related logic to dynamically generate file IDs where necessary. - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`. - **Test Adjustments**: - Updated tests to align with new path and indexer management. - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes. - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`. --- src/mito2/src/access_layer.rs | 94 +++++++++--- src/mito2/src/cache/write_cache.rs | 109 +++++++------- src/mito2/src/compaction.rs | 4 +- src/mito2/src/compaction/compactor.rs | 13 +- src/mito2/src/compaction/picker.rs | 5 - src/mito2/src/compaction/twcs.rs | 5 +- src/mito2/src/compaction/window.rs | 3 +- src/mito2/src/flush.rs | 42 +++--- src/mito2/src/sst/index.rs | 139 ++++++++---------- src/mito2/src/sst/parquet.rs | 107 +++++++++----- src/mito2/src/sst/parquet/writer.rs | 102 +++++++++---- src/mito2/src/test_util/sst_util.rs | 12 +- src/mito2/src/test_util/version_util.rs | 1 - .../src/puffin_manager/fs_puffin_manager.rs | 1 + 14 files changed, 383 insertions(+), 254 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 16d1480a61ed..51dd7a962a7e 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; @@ -30,13 +32,15 @@ use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; pub type AccessLayerRef = Arc; +/// SST write results. +pub type SstInfoArray = SmallVec<[SstInfo; 2]>; /// A layer to access SST files under the same directory. pub struct AccessLayer { @@ -121,11 +125,8 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - ) -> Result> { - let file_path = location::sst_file_path(&self.region_dir, request.file_id); - let index_file_path = location::index_file_path(&self.region_dir, request.file_id); + ) -> Result { let region_id = request.metadata.region_id; - let file_id = request.file_id; let cache_manager = request.cache_manager.clone(); let sst_info = if let Some(write_cache) = cache_manager.write_cache() { @@ -134,8 +135,9 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - upload_path: file_path, - index_upload_path: index_file_path, + dest_path_provider: RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, remote_store: self.object_store.clone(), }, write_opts, @@ -144,11 +146,9 @@ impl AccessLayer { } else { // Write cache is disabled. let store = self.object_store.clone(); - let indexer = IndexerBuilder { + let indexer_builder = IndexerBuilderImpl { op_type: request.op_type, - file_id, - file_path: index_file_path, - metadata: &request.metadata, + metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -156,24 +156,31 @@ impl AccessLayer { inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, - } - .build() - .await; + }; let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), - file_path, request.metadata, - indexer, - ); + indexer_builder, + RegionFilePathFactory { + region_dir: self.region_dir.clone(), + }, + ) + .await; writer .write_all(request.source, request.max_sequence, write_opts) .await? }; // Put parquet metadata to cache manager. - if let Some(sst_info) = &sst_info { - if let Some(parquet_metadata) = &sst_info.file_metadata { - cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone()) + if !sst_info.is_empty() { + for sst in &sst_info { + if let Some(parquet_metadata) = &sst.file_metadata { + cache_manager.put_parquet_meta_data( + region_id, + sst.file_id, + parquet_metadata.clone(), + ) + } } } @@ -191,7 +198,6 @@ pub(crate) enum OperationType { /// Contents to build a SST. pub(crate) struct SstWriteRequest { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, @@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> { Ok(()) } + +/// Path provider for SST file and index file. +pub trait FilePathProvider: Send + Sync { + /// Creates index file path of given file id. + fn build_index_file_path(&self, file_id: FileId) -> String; + + /// Creates SST file path of given file id. + fn build_sst_file_path(&self, file_id: FileId) -> String; +} + +/// Path provider that builds paths in local write cache. +#[derive(Clone)] +pub(crate) struct WriteCachePathProvider { + pub(crate) region_id: RegionId, + pub(crate) file_cache: FileCacheRef, +} + +impl FilePathProvider for WriteCachePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + self.file_cache.cache_file_path(puffin_key) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet); + self.file_cache.cache_file_path(parquet_file_key) + } +} + +/// Path provider that builds paths in region storage path. +#[derive(Clone, Debug)] +pub(crate) struct RegionFilePathFactory { + pub(crate) region_dir: String, +} + +impl FilePathProvider for RegionFilePathFactory { + fn build_index_file_path(&self, file_id: FileId) -> String { + location::index_file_path(&self.region_dir, file_id) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + location::sst_file_path(&self.region_dir, file_id) + } +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 1e9dfb540093..0ae00b3c6cf2 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,7 +23,10 @@ use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; -use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; +use crate::access_layer::{ + new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, + WriteCachePathProvider, +}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{ @@ -32,9 +35,9 @@ use crate::metrics::{ }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::parquet::WriteOptions; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. @@ -103,22 +106,21 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - ) -> Result> { + ) -> Result { let timer = FLUSH_ELAPSED .with_label_values(&["write_sst"]) .start_timer(); let region_id = write_request.metadata.region_id; - let file_id = write_request.file_id; - let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); let store = self.file_cache.local_store(); - let indexer = IndexerBuilder { + let path_provider = WriteCachePathProvider { + file_cache: self.file_cache.clone(), + region_id, + }; + let indexer = IndexerBuilderImpl { op_type: write_request.op_type, - file_id, - file_path: self.file_cache.cache_file_path(puffin_key), - metadata: &write_request.metadata, + metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -126,17 +128,16 @@ impl WriteCache { inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, bloom_filter_index_config: write_request.bloom_filter_index_config, - } - .build() - .await; + }; // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( self.file_cache.local_store(), - self.file_cache.cache_file_path(parquet_key), write_request.metadata, indexer, - ); + path_provider, + ) + .await; let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) @@ -145,22 +146,29 @@ impl WriteCache { timer.stop_and_record(); // Upload sst file to remote object store. - let Some(sst_info) = sst_info else { - // No data need to upload. - return Ok(None); - }; + if sst_info.is_empty() { + return Ok(sst_info); + } - let parquet_path = &upload_request.upload_path; let remote_store = &upload_request.remote_store; - self.upload(parquet_key, parquet_path, remote_store).await?; - - if sst_info.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let puffin_path = &upload_request.index_upload_path; - self.upload(puffin_key, puffin_path, remote_store).await?; + for sst in &sst_info { + let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet); + let parquet_path = upload_request + .dest_path_provider + .build_sst_file_path(sst.file_id); + self.upload(parquet_key, &parquet_path, remote_store) + .await?; + + if sst.index_metadata.file_size > 0 { + let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); + let puffin_path = &upload_request + .dest_path_provider + .build_index_file_path(sst.file_id); + self.upload(puffin_key, puffin_path, remote_store).await?; + } } - Ok(Some(sst_info)) + Ok(sst_info) } /// Removes a file from the cache by `index_key`. @@ -319,10 +327,8 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { - /// Path to upload the file. - pub upload_path: String, - /// Path to upload the index file. - pub index_upload_path: String, + /// Destination path provider of which SST files in write cache should be uploaded to. + pub dest_path_provider: RegionFilePathFactory, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -336,11 +342,9 @@ mod tests { use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; use crate::region::options::IndexOptions; - use crate::sst::file::FileId; - use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, + assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata, }; use crate::test_util::TestEnv; @@ -351,9 +355,9 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let file_id = FileId::random(); - let upload_path = sst_file_path("test", file_id); - let index_upload_path = index_file_path("test", file_id); + let path_provider = RegionFilePathFactory { + region_dir: "test".to_string(), + }; let local_dir = create_temp_dir(""); let local_store = new_fs_store(local_dir.path().to_str().unwrap()); @@ -373,7 +377,6 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -386,8 +389,7 @@ mod tests { }; let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: path_provider.clone(), remote_store: mock_store.clone(), }; @@ -397,18 +399,22 @@ mod tests { }; // Write to cache and upload sst to mock remote store - write_cache + let sst_info = write_cache .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); //todo(hl): we assume it only creates one file. + + let file_id = sst_info.file_id; + let sst_upload_path = path_provider.build_sst_file_path(file_id); + let index_upload_path = path_provider.build_index_file_path(file_id); // Check write cache contains the key let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data - let remote_data = mock_store.read(&upload_path).await.unwrap(); + let remote_data = mock_store.read(&sst_upload_path).await.unwrap(); let cache_data = local_store .read(&write_cache.file_cache.cache_file_path(key)) .await @@ -436,6 +442,7 @@ mod tests { #[tokio::test] async fn test_read_metadata_from_write_cache() { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new(); let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); @@ -456,8 +463,7 @@ mod tests { // Create source let metadata = Arc::new(sst_region_metadata()); - let handle = sst_file_handle(0, 1000); - let file_id = handle.file_id(); + let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), new_batch_by_range(&["b", "f"], 0, 40), @@ -467,7 +473,6 @@ mod tests { // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -482,11 +487,10 @@ mod tests { row_group_size: 512, ..Default::default() }; - let upload_path = sst_file_path(&data_home, file_id); - let index_upload_path = index_file_path(&data_home, file_id); let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: RegionFilePathFactory { + region_dir: data_home.clone(), + }, remote_store: mock_store.clone(), }; @@ -494,10 +498,11 @@ mod tests { .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let write_parquet_metadata = sst_info.file_metadata.unwrap(); // Read metadata from write cache + let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000); let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) .cache(CacheStrategy::EnableAll(cache_manager.clone())); let reader = builder.build().await.unwrap(); diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bf8df5fcec7a..21237a340c04 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -66,7 +66,7 @@ use crate::schedule::remote_job_scheduler::{ CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, }; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::file::{FileHandle, FileMeta, Level}; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -548,7 +548,6 @@ impl CompactionStatus { #[derive(Debug, Clone)] pub struct CompactionOutput { - pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, /// Compaction input files. @@ -562,7 +561,6 @@ pub struct CompactionOutput { /// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SerializedCompactionOutput { - output_file_id: FileId, output_level: Level, inputs: Vec, filter_deleted: bool, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ceeb509bc17e..affbda0f003e 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -271,7 +271,7 @@ impl Compactor for DefaultCompactor { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); info!( - "Compaction region {} output [{}]-> {}", + "Region {} compaction input: [{}]", compaction_region.region_id, output .inputs @@ -279,7 +279,6 @@ impl Compactor for DefaultCompactor { .map(|f| f.file_id().to_string()) .collect::>() .join(","), - output.output_file_id ); let write_opts = WriteOptions { @@ -290,7 +289,6 @@ impl Compactor for DefaultCompactor { let region_metadata = compaction_region.region_metadata.clone(); let sst_layer = compaction_region.access_layer.clone(); let region_id = compaction_region.region_id; - let file_id = output.output_file_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -327,7 +325,6 @@ impl Compactor for DefaultCompactor { .write_sst( SstWriteRequest { op_type: OperationType::Compact, - file_id, metadata: region_metadata, source: Source::Reader(reader), cache_manager, @@ -341,9 +338,10 @@ impl Compactor for DefaultCompactor { &write_opts, ) .await? + .into_iter() .map(|sst_info| FileMeta { region_id, - file_id, + file_id: sst_info.file_id, time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, @@ -352,7 +350,8 @@ impl Compactor for DefaultCompactor { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, - }); + }) + .collect::>(); Ok(file_meta_opt) }); } @@ -369,7 +368,7 @@ impl Compactor for DefaultCompactor { .await .context(JoinSnafu)? .into_iter() - .collect::>>()?; + .collect::>>>()?; output_files.extend(metas.into_iter().flatten()); } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 9397c2bf6470..431973c3b662 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -61,7 +61,6 @@ impl From<&PickerOutput> for SerializedPickerOutput { .outputs .iter() .map(|output| SerializedCompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(), filter_deleted: output.filter_deleted, @@ -91,7 +90,6 @@ impl PickerOutput { .outputs .into_iter() .map(|output| CompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output .inputs @@ -167,14 +165,12 @@ mod tests { let picker_output = PickerOutput { outputs: vec![ CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, output_time_range: None, }, CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, @@ -205,7 +201,6 @@ mod tests { .iter() .zip(picker_output_from_serialized.outputs.iter()) .for_each(|(expected, actual)| { - assert_eq!(expected.output_file_id, actual.output_file_id); assert_eq!(expected.output_level, actual.output_level); expected .inputs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8efaa6c65fb6..a4e8913eeffb 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{overlaps, FileHandle, FileId, Level}; +use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; @@ -134,7 +134,6 @@ impl TwcsPicker { for input in split_inputs { debug_assert!(input.len() > 1); output.push(CompactionOutput { - output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 inputs: input, filter_deleted, @@ -373,7 +372,7 @@ mod tests { use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileMeta, Level}; + use crate::sst::file::{FileId, FileMeta, Level}; use crate::test_util::NoopFilePurger; #[test] diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 10bdb47297d5..f7ad4af893ee 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::{CompactionRegion, CompactionVersion}; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::FileHandle; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -132,7 +132,6 @@ fn build_output(windows: BTreeMap)>) -> Vec, inverted_indexer: Option, last_mem_inverted_index: usize, @@ -168,11 +167,15 @@ impl Indexer { } } -pub(crate) struct IndexerBuilder<'a> { +#[async_trait::async_trait] +pub trait IndexerBuilder { + /// Builds indexer of given file id to [index_file_path]. + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer; +} + +pub(crate) struct IndexerBuilderImpl { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, - pub(crate) file_path: String, - pub(crate) metadata: &'a RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, pub(crate) intermediate_manager: IntermediateManager, @@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) bloom_filter_index_config: BloomFilterConfig, } -impl<'a> IndexerBuilder<'a> { +#[async_trait::async_trait] +impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - pub(crate) async fn build(self) -> Indexer { + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer { let mut indexer = Indexer { - file_id: self.file_id, - file_path: self.file_path.clone(), + file_id, + file_path: index_file_path, region_id: self.metadata.region_id, - ..Default::default() }; - indexer.inverted_indexer = self.build_inverted_indexer(); - indexer.fulltext_indexer = self.build_fulltext_indexer().await; - indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + indexer.inverted_indexer = self.build_inverted_indexer(file_id); + indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() && indexer.bloom_filter_indexer.is_none() @@ -204,11 +207,13 @@ impl<'a> IndexerBuilder<'a> { return Indexer::default(); } - indexer.puffin_manager = Some(self.puffin_manager); + indexer.puffin_manager = Some(self.puffin_manager.clone()); indexer } +} - fn build_inverted_indexer(&self) -> Option { +impl IndexerBuilderImpl { + fn build_inverted_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), @@ -217,7 +222,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating inverted index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -225,7 +230,7 @@ impl<'a> IndexerBuilder<'a> { if self.metadata.primary_key.is_empty() { debug!( "No tag columns, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -235,7 +240,7 @@ impl<'a> IndexerBuilder<'a> { else { warn!( "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -243,7 +248,7 @@ impl<'a> IndexerBuilder<'a> { let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { warn!( "Row group size is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -254,8 +259,8 @@ impl<'a> IndexerBuilder<'a> { } let indexer = InvertedIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), self.inverted_index_config.mem_threshold_on_create(), segment_row_count, @@ -267,7 +272,7 @@ impl<'a> IndexerBuilder<'a> { Some(indexer) } - async fn build_fulltext_indexer(&self) -> Option { + async fn build_fulltext_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), @@ -276,7 +281,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating full-text index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -284,9 +289,9 @@ impl<'a> IndexerBuilder<'a> { let mem_limit = self.fulltext_index_config.mem_threshold_on_create(); let creator = FulltextIndexer::new( &self.metadata.region_id, - &self.file_id, + &file_id, &self.intermediate_manager, - self.metadata, + &self.metadata, self.fulltext_index_config.compress, mem_limit, ) @@ -297,7 +302,7 @@ impl<'a> IndexerBuilder<'a> { if creator.is_none() { debug!( "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return creator; @@ -308,19 +313,19 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create full-text indexer, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } None } - fn build_bloom_filter_indexer(&self) -> Option { + fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), @@ -329,15 +334,15 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating bloom filter due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), mem_limit, ); @@ -347,7 +352,7 @@ impl<'a> IndexerBuilder<'a> { if indexer.is_none() { debug!( "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return indexer; @@ -358,12 +363,12 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create bloom filter, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } @@ -489,11 +494,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -502,7 +505,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -521,11 +524,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -537,18 +538,16 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -560,18 +559,16 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -583,7 +580,7 @@ mod tests { ..Default::default() }, } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -602,11 +599,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -615,7 +610,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -627,11 +622,9 @@ mod tests { with_fulltext: false, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -640,7 +633,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -652,11 +645,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: false, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -665,7 +656,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -684,11 +675,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -697,7 +686,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 05dafb0edfc3..12d16b7cda3e 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileId, FileTimeRange}; use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -62,6 +62,8 @@ impl Default for WriteOptions { /// Parquet SST info returned by the writer. pub struct SstInfo { + /// SST file id. + pub file_id: FileId, /// Time range of the SST. The timestamps have the same time unit as the /// data in the SST. pub time_range: FileTimeRange, @@ -95,12 +97,13 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; + use crate::access_layer::FilePathProvider; use crate::cache::{CacheManager, CacheStrategy, PageKey}; - use crate::sst::index::Indexer; + use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; - use crate::sst::DEFAULT_WRITE_CONCURRENCY; + use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, @@ -109,12 +112,38 @@ mod tests { const FILE_DIR: &str = "/"; + #[derive(Clone)] + struct FixedPathProvider { + file_id: FileId, + } + + impl FilePathProvider for FixedPathProvider { + fn build_index_file_path(&self, _file_id: FileId) -> String { + location::index_file_path(FILE_DIR, self.file_id) + } + + fn build_sst_file_path(&self, _file_id: FileId) -> String { + location::sst_file_path(FILE_DIR, self.file_id) + } + } + + struct NoopIndexBuilder; + + #[async_trait::async_trait] + impl IndexerBuilder for NoopIndexBuilder { + async fn build(&self, _file_id: FileId, _path: String) -> Indexer { + Indexer::default() + } + } + #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); + let file_path = FixedPathProvider { + file_id: handle.file_id(), + }; let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -126,18 +155,20 @@ mod tests { row_group_size: 50, ..Default::default() }; + let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), + metadata.clone(), + NoopIndexBuilder, file_path, - metadata, - Indexer::default(), - ); + ) + .await; let info = writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); assert_eq!( @@ -168,7 +199,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -183,16 +213,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Enable page cache. let cache = CacheStrategy::EnableAll(Arc::new( @@ -236,7 +269,6 @@ mod tests { let mut env = crate::test_util::TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -252,16 +284,19 @@ mod tests { // sst info contains the parquet metadata, which is converted from FileMetaData let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; let sst_info = writer .write_all(source, None, &write_opts) .await .unwrap() - .expect("write_all should return sst info"); + .remove(0); let writer_metadata = sst_info.file_metadata.unwrap(); // read the sst file metadata @@ -277,7 +312,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -292,15 +326,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { @@ -330,7 +367,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "z"], 0, 0), @@ -345,15 +381,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); @@ -365,7 +404,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -380,16 +418,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13f7cfb3ec91..0e4eabc96ed1 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -28,6 +28,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; +use smallvec::smallvec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; @@ -35,40 +36,48 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; +use crate::access_layer::{FilePathProvider, SstInfoArray}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; -use crate::sst::index::Indexer; +use crate::sst::file::FileId; +use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// Parquet SST writer. -pub struct ParquetWriter { +pub struct ParquetWriter { + /// Path provider that creates SST and index file paths according to file id. + path_provider: P, writer: Option>>, + /// Current active file id. + current_file: FileId, writer_factory: F, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, - indexer: Indexer, + /// Indexer build that can create indexer for multiple files. + indexer_builder: I, + /// Current active indexer. + current_indexer: Option, bytes_written: Arc, } pub trait WriterFactory { type Writer: AsyncWrite + Send + Unpin; - fn create(&mut self) -> impl Future>; + fn create(&mut self, file_path: &str) -> impl Future>; } pub struct ObjectStoreWriterFactory { - path: String, object_store: ObjectStore, } impl WriterFactory for ObjectStoreWriterFactory { type Writer = Compat; - async fn create(&mut self) -> Result { + async fn create(&mut self, file_path: &str) -> Result { self.object_store - .writer_with(&self.path) + .writer_with(file_path) .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await @@ -77,36 +86,73 @@ impl WriterFactory for ObjectStoreWriterFactory { } } -impl ParquetWriter { - pub fn new_with_object_store( +impl ParquetWriter +where + P: FilePathProvider, + I: IndexerBuilder, +{ + pub async fn new_with_object_store( object_store: ObjectStore, - path: String, metadata: RegionMetadataRef, - indexer: Indexer, - ) -> ParquetWriter { + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { ParquetWriter::new( - ObjectStoreWriterFactory { path, object_store }, + ObjectStoreWriterFactory { object_store }, metadata, - indexer, + indexer_builder, + path_provider, ) + .await } } -impl ParquetWriter +impl ParquetWriter where F: WriterFactory, + I: IndexerBuilder, + P: FilePathProvider, { /// Creates a new parquet SST writer. - pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter { + pub async fn new( + factory: F, + metadata: RegionMetadataRef, + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { + let init_file = FileId::random(); + let index_file_path = path_provider.build_index_file_path(init_file); + let indexer = indexer_builder.build(init_file, index_file_path).await; + ParquetWriter { + path_provider, writer: None, + current_file: init_file, writer_factory: factory, metadata, - indexer, + indexer_builder, + current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), } } + async fn get_or_create_indexer(&mut self) -> &mut Indexer { + match self.current_indexer { + None => { + self.current_file = FileId::random(); + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = Some(indexer); + // safety: self.current_indexer already set above. + self.current_indexer.as_mut().unwrap() + } + Some(ref mut indexer) => indexer, + } + } + /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. @@ -115,7 +161,7 @@ where mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, - ) -> Result> { + ) -> Result { let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -128,24 +174,24 @@ where match res { Ok(batch) => { stats.update(&batch); - self.indexer.update(&batch).await; + self.get_or_create_indexer().await.update(&batch).await; } Err(e) => { - self.indexer.abort().await; + self.get_or_create_indexer().await.abort().await; return Err(e); } } } - let index_output = self.indexer.finish().await; + let index_output = self.get_or_create_indexer().await.finish().await; if stats.num_rows == 0 { - return Ok(None); + return Ok(smallvec![]); } let Some(mut arrow_writer) = self.writer.take() else { // No batch actually written. - return Ok(None); + return Ok(smallvec![]); }; arrow_writer.flush().await.context(WriteParquetSnafu)?; @@ -159,15 +205,18 @@ where // convert FileMetaData to ParquetMetaData let parquet_metadata = parse_parquet_metadata(file_meta)?; + let file_id = self.current_file; + // object_store.write will make sure all bytes are written or an error is raised. - Ok(Some(SstInfo { + Ok(smallvec![SstInfo { + file_id, time_range, file_size, num_rows: stats.num_rows, num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), index_metadata: index_output, - })) + }]) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -229,8 +278,9 @@ where let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); + let sst_file_path = self.path_provider.build_sst_file_path(self.current_file); let writer = SizeAwareWriter::new( - self.writer_factory.create().await?, + self.writer_factory.create(&sst_file_path).await?, self.bytes_written.clone(), ); let arrow_writer = diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 63c3fc09d621..63b1d2e7a0c0 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,6 @@ //! Utilities for testing SSTs. -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; @@ -100,13 +99,13 @@ pub fn new_source(batches: &[Batch]) -> Source { Source::Reader(Box::new(reader)) } -/// Creates a new [FileHandle] for a SST. -pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { +/// Creates a SST file handle with provided file id +pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( FileMeta { region_id: REGION_ID, - file_id: FileId::random(), + file_id, time_range: ( Timestamp::new_millisecond(start_ms), Timestamp::new_millisecond(end_ms), @@ -123,6 +122,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ) } +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms) +} + pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 68534d34eeb8..9b98fbf026f4 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,7 +15,6 @@ //! Utilities to mock version. use std::collections::HashMap; -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 52190f92fb28..c03a86aaf672 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. +#[derive(Clone)] pub struct FsPuffinManager { /// The stager. stager: S, From c344c6aaa13bd32174bed100531a2506e8d85975 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 6 Jan 2025 11:43:46 +0800 Subject: [PATCH 2/4] - **Removed Output Size Enforcement in `twcs.rs`:** - Deleted the `enforce_max_output_size` function and related logic to simplify compaction input handling. - **Added Max File Size Option in `parquet.rs`:** - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files. - **Refactored Indexer Management in `parquet/writer.rs`:** - Changed `current_indexer` from an `Option` to a direct `Indexer` type. - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`. - Simplified indexer initialization and management logic. --- src/mito2/src/compaction/twcs.rs | 95 +---------------------------- src/mito2/src/sst/parquet.rs | 3 + src/mito2/src/sst/parquet/writer.rs | 39 ++++++------ 3 files changed, 24 insertions(+), 113 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index a4e8913eeffb..2cf2f730e2cf 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -118,20 +118,7 @@ impl TwcsPicker { continue; }; - let split_inputs = if !filter_deleted - && let Some(max_output_file_size) = self.max_output_file_size - { - let len_before_split = inputs.len(); - let maybe_split = enforce_max_output_size(inputs, max_output_file_size); - if maybe_split.len() != len_before_split { - info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split); - } - maybe_split - } else { - inputs - }; - - for input in split_inputs { + for input in inputs { debug_assert!(input.len() > 1); output.push(CompactionOutput { output_level: LEVEL_COMPACTED, // always compact to l1 @@ -145,43 +132,6 @@ impl TwcsPicker { } } -/// Limits the size of compaction output in a naive manner. -/// todo(hl): we can find the output file size more precisely by checking the time range -/// of each row group and adding the sizes of those non-overlapping row groups. But now -/// we'd better not to expose the SST details in this level. -fn enforce_max_output_size( - inputs: Vec>, - max_output_file_size: u64, -) -> Vec> { - inputs - .into_iter() - .flat_map(|input| { - debug_assert!(input.len() > 1); - let estimated_output_size = input.iter().map(|f| f.size()).sum::(); - if estimated_output_size < max_output_file_size { - // total file size does not exceed the threshold, just return the original input. - return vec![input]; - } - let mut splits = vec![]; - let mut new_input = vec![]; - let mut new_input_size = 0; - for f in input { - if new_input_size + f.size() > max_output_file_size { - splits.push(std::mem::take(&mut new_input)); - new_input_size = 0; - } - new_input_size += f.size(); - new_input.push(f); - } - if !new_input.is_empty() { - splits.push(new_input); - } - splits - }) - .filter(|p| p.len() > 1) - .collect() -} - /// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses /// the solution with minimum overhead according to files sizes to be merged. /// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs. @@ -368,12 +318,10 @@ fn find_latest_window_in_seconds<'a>( #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::Arc; use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileId, FileMeta, Level}; - use crate::test_util::NoopFilePurger; + use crate::sst::file::{FileId, Level}; #[test] fn test_get_latest_window_in_seconds() { @@ -741,44 +689,5 @@ mod tests { .check(); } - fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec { - inputs - .iter() - .map(|(start, end, size)| { - FileHandle::new( - FileMeta { - region_id: Default::default(), - file_id: Default::default(), - time_range: ( - Timestamp::new_millisecond(*start), - Timestamp::new_millisecond(*end), - ), - level: 0, - file_size: *size, - available_indexes: Default::default(), - index_file_size: 0, - num_rows: 0, - num_row_groups: 0, - sequence: None, - }, - Arc::new(NoopFilePurger), - ) - }) - .collect() - } - - #[test] - fn test_limit_output_size() { - let mut files = make_file_handles(&[(1, 1, 1)].repeat(6)); - let runs = find_sorted_runs(&mut files); - assert_eq!(6, runs.len()); - let files_to_merge = reduce_runs(runs, 2); - - let enforced = enforce_max_output_size(files_to_merge, 2); - assert_eq!(2, enforced.len()); - assert_eq!(2, enforced[0].len()); - assert_eq!(2, enforced[1].len()); - } - // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 12d16b7cda3e..85b1a789dba1 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -49,6 +49,8 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Max single output file size. + pub max_file_size: usize, } impl Default for WriteOptions { @@ -56,6 +58,7 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + max_file_size: usize::MAX, } } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 0e4eabc96ed1..03aeee807dd4 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -59,7 +59,7 @@ pub struct ParquetWriter, + current_indexer: Indexer, bytes_written: Arc, } @@ -131,26 +131,22 @@ where writer_factory: factory, metadata, indexer_builder, - current_indexer: Some(indexer), + current_indexer: indexer, bytes_written: Arc::new(AtomicUsize::new(0)), } } - async fn get_or_create_indexer(&mut self) -> &mut Indexer { - match self.current_indexer { - None => { - self.current_file = FileId::random(); - let index_file_path = self.path_provider.build_index_file_path(self.current_file); - let indexer = self - .indexer_builder - .build(self.current_file, index_file_path) - .await; - self.current_indexer = Some(indexer); - // safety: self.current_indexer already set above. - self.current_indexer.as_mut().unwrap() - } - Some(ref mut indexer) => indexer, - } + async fn roll_to_next_file(&mut self) { + self.current_file = FileId::random(); + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = indexer; + + // maybe_init_writer will re-create a new file. + self.writer = None; } /// Iterates source and writes all rows to Parquet file. @@ -174,16 +170,19 @@ where match res { Ok(batch) => { stats.update(&batch); - self.get_or_create_indexer().await.update(&batch).await; + self.current_indexer.update(&batch).await; + if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size { + self.roll_to_next_file().await; + } } Err(e) => { - self.get_or_create_indexer().await.abort().await; + self.current_indexer.abort().await; return Err(e); } } } - let index_output = self.get_or_create_indexer().await.finish().await; + let index_output = self.current_indexer.finish().await; if stats.num_rows == 0 { return Ok(smallvec![]); From 2c5ace52ddcfc42b1b62a3cf308ce246da900142 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 6 Jan 2025 14:34:12 +0800 Subject: [PATCH 3/4] **Refactor Parquet Writer Initialization and File Handling** - Updated `ParquetWriter` in `writer.rs` to handle `current_indexer` as an `Option`, allowing for more flexible initialization and management. - Introduced `finish_current_file` method to encapsulate logic for completing and transitioning between SST files, improving code clarity and maintainability. - Enhanced error handling and logging with `debug` statements for better traceability during file operations. --- src/mito2/src/sst/parquet/writer.rs | 114 ++++++++++++++++------------ 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 03aeee807dd4..55189bc9e7a0 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,11 +15,13 @@ //! Parquet writer. use std::future::Future; +use std::mem; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::debug; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; use object_store::{FuturesAsyncWriter, ObjectStore}; @@ -59,7 +61,7 @@ pub struct ParquetWriter, bytes_written: Arc, } @@ -121,8 +123,6 @@ where path_provider: P, ) -> ParquetWriter { let init_file = FileId::random(); - let index_file_path = path_provider.build_index_file_path(init_file); - let indexer = indexer_builder.build(init_file, index_file_path).await; ParquetWriter { path_provider, @@ -131,22 +131,50 @@ where writer_factory: factory, metadata, indexer_builder, - current_indexer: indexer, + current_indexer: None, bytes_written: Arc::new(AtomicUsize::new(0)), } } - async fn roll_to_next_file(&mut self) { - self.current_file = FileId::random(); - let index_file_path = self.path_provider.build_index_file_path(self.current_file); - let indexer = self - .indexer_builder - .build(self.current_file, index_file_path) - .await; - self.current_indexer = indexer; - + /// Finishes current SST file and index file. + async fn finish_current_file( + &mut self, + ssts: &mut SstInfoArray, + stats: &mut SourceStats, + ) -> Result<()> { // maybe_init_writer will re-create a new file. - self.writer = None; + if let Some(mut current_writer) = mem::take(&mut self.writer) { + let stats = mem::take(stats); + // At least one row has been written. + assert!(stats.num_rows > 0); + + // Finish indexer and writer. + // safety: writer and index can only be both present or not. + let index_output = self.current_indexer.as_mut().unwrap().finish().await; + current_writer.flush().await.context(WriteParquetSnafu)?; + + let file_meta = current_writer.close().await.context(WriteParquetSnafu)?; + let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; + + // Safety: num rows > 0 so we must have min/max. + let time_range = stats.time_range.unwrap(); + + // convert FileMetaData to ParquetMetaData + let parquet_metadata = parse_parquet_metadata(file_meta)?; + ssts.push(SstInfo { + file_id: self.current_file, + time_range, + file_size, + num_rows: stats.num_rows, + num_row_groups: parquet_metadata.num_row_groups() as u64, + file_metadata: Some(Arc::new(parquet_metadata)), + index_metadata: index_output, + }); + self.current_file = FileId::random(); + self.bytes_written.store(0, Ordering::Relaxed) + }; + + Ok(()) } /// Iterates source and writes all rows to Parquet file. @@ -158,6 +186,7 @@ where override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, ) -> Result { + let mut results = smallvec![]; let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -170,52 +199,31 @@ where match res { Ok(batch) => { stats.update(&batch); - self.current_indexer.update(&batch).await; + // safety: self.current_indexer must be set when first batch has been written. + self.current_indexer.as_mut().unwrap().update(&batch).await; if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size { - self.roll_to_next_file().await; + debug!( + "Finishing current file {}, file size: {}, max file size: {}", + self.current_file, + self.bytes_written.load(Ordering::Relaxed), + opts.max_file_size + ); + self.finish_current_file(&mut results, &mut stats).await?; } } Err(e) => { - self.current_indexer.abort().await; + if let Some(indexer) = &mut self.current_indexer { + indexer.abort().await; + } return Err(e); } } } - let index_output = self.current_indexer.finish().await; - - if stats.num_rows == 0 { - return Ok(smallvec![]); - } - - let Some(mut arrow_writer) = self.writer.take() else { - // No batch actually written. - return Ok(smallvec![]); - }; - - arrow_writer.flush().await.context(WriteParquetSnafu)?; - - let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?; - let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; - - // Safety: num rows > 0 so we must have min/max. - let time_range = stats.time_range.unwrap(); - - // convert FileMetaData to ParquetMetaData - let parquet_metadata = parse_parquet_metadata(file_meta)?; - - let file_id = self.current_file; + self.finish_current_file(&mut results, &mut stats).await?; // object_store.write will make sure all bytes are written or an error is raised. - Ok(smallvec![SstInfo { - file_id, - time_range, - file_size, - num_rows: stats.num_rows, - num_row_groups: parquet_metadata.num_row_groups() as u64, - file_metadata: Some(Arc::new(parquet_metadata)), - index_metadata: index_output, - }]) + Ok(results) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -286,6 +294,14 @@ where AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; self.writer = Some(arrow_writer); + + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = Some(indexer); + // safety: self.writer is assigned above Ok(self.writer.as_mut().unwrap()) } From d410f7fcbad8907cb4c5d36b01dfcc5faa5b1f15 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 6 Jan 2025 19:52:52 +0800 Subject: [PATCH 4/4] - **Refactor `RegionFilePathFactory` to `RegionFilePathProvider`:** Updated references and implementations in `access_layer.rs`, `write_cache.rs`, and related test files to use the new struct name. - **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`. - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints. --- src/mito2/src/access_layer.rs | 8 ++-- src/mito2/src/cache/write_cache.rs | 8 ++-- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/compaction/picker.rs | 6 +++ src/mito2/src/compaction/twcs.rs | 2 + src/mito2/src/compaction/window.rs | 1 + src/mito2/src/sst/parquet.rs | 66 +++++++++++++++++++++++++-- src/mito2/src/sst/parquet/writer.rs | 17 ++++--- 8 files changed, 90 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 51dd7a962a7e..f82c15858b4b 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -135,7 +135,7 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - dest_path_provider: RegionFilePathFactory { + dest_path_provider: RegionFilePathProvider { region_dir: self.region_dir.clone(), }, remote_store: self.object_store.clone(), @@ -161,7 +161,7 @@ impl AccessLayer { self.object_store.clone(), request.metadata, indexer_builder, - RegionFilePathFactory { + RegionFilePathProvider { region_dir: self.region_dir.clone(), }, ) @@ -266,11 +266,11 @@ impl FilePathProvider for WriteCachePathProvider { /// Path provider that builds paths in region storage path. #[derive(Clone, Debug)] -pub(crate) struct RegionFilePathFactory { +pub(crate) struct RegionFilePathProvider { pub(crate) region_dir: String, } -impl FilePathProvider for RegionFilePathFactory { +impl FilePathProvider for RegionFilePathProvider { fn build_index_file_path(&self, file_id: FileId) -> String { location::index_file_path(&self.region_dir, file_id) } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 0ae00b3c6cf2..a0f068399ff7 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -24,7 +24,7 @@ use object_store::ObjectStore; use snafu::ResultExt; use crate::access_layer::{ - new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, + new_fs_cache_store, FilePathProvider, RegionFilePathProvider, SstInfoArray, SstWriteRequest, WriteCachePathProvider, }; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; @@ -328,7 +328,7 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { /// Destination path provider of which SST files in write cache should be uploaded to. - pub dest_path_provider: RegionFilePathFactory, + pub dest_path_provider: RegionFilePathProvider, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -355,7 +355,7 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let path_provider = RegionFilePathFactory { + let path_provider = RegionFilePathProvider { region_dir: "test".to_string(), }; @@ -488,7 +488,7 @@ mod tests { ..Default::default() }; let upload_request = SstUploadRequest { - dest_path_provider: RegionFilePathFactory { + dest_path_provider: RegionFilePathProvider { region_dir: data_home.clone(), }, remote_store: mock_store.clone(), diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index affbda0f003e..a3929fb9c227 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -283,6 +283,7 @@ impl Compactor for DefaultCompactor { let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + max_file_size: picker_output.max_file_size, ..Default::default() }; diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 431973c3b662..0ceafefcd861 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -45,6 +45,8 @@ pub struct PickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + /// Max single output file size in bytes. + pub max_file_size: Option, } /// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta]. @@ -53,6 +55,7 @@ pub struct SerializedPickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + pub max_file_size: Option, } impl From<&PickerOutput> for SerializedPickerOutput { @@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -111,6 +115,7 @@ impl PickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -179,6 +184,7 @@ mod tests { ], expired_ssts: expired_ssts_file_handle.clone(), time_window_size: 1000, + max_file_size: None, }; let picker_output_str = diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 2cf2f730e2cf..dddfb7934905 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -196,10 +196,12 @@ impl Picker for TwcsPicker { return None; } + let max_file_size = self.max_output_file_size.map(|v| v as usize); Some(PickerOutput { outputs, expired_ssts, time_window_size, + max_file_size, }) } } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index f7ad4af893ee..06212cb6d513 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker { outputs, expired_ssts, time_window_size: time_window, + max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction. }) } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 85b1a789dba1..2df6ee70f863 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -50,7 +50,9 @@ pub struct WriteOptions { /// Row group size. pub row_group_size: usize, /// Max single output file size. - pub max_file_size: usize, + /// Note: This is not a hard limit as we can only observe the file size when + /// ArrowWrite writes to underlying writers. + pub max_file_size: Option, } impl Default for WriteOptions { @@ -58,7 +60,7 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, - max_file_size: usize::MAX, + max_file_size: None, } } } @@ -100,8 +102,9 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::access_layer::FilePathProvider; + use crate::access_layer::{FilePathProvider, RegionFilePathProvider}; use crate::cache::{CacheManager, CacheStrategy, PageKey}; + use crate::read::BatchReader; use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -109,7 +112,8 @@ mod tests { use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id, + sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -539,4 +543,58 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_write_multiple_files() { + common_telemetry::init_default_ut_logging(); + // create test env + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let metadata = Arc::new(sst_region_metadata()); + let batches = &[ + new_batch_by_range(&["a", "d"], 0, 1000), + new_batch_by_range(&["b", "f"], 0, 1000), + new_batch_by_range(&["b", "h"], 100, 200), + new_batch_by_range(&["b", "h"], 200, 300), + new_batch_by_range(&["b", "h"], 300, 1000), + ]; + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + let source = new_source(batches); + let write_opts = WriteOptions { + row_group_size: 50, + max_file_size: Some(1024 * 16), + ..Default::default() + }; + + let path_provider = RegionFilePathProvider { + region_dir: "test".to_string(), + }; + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + NoopIndexBuilder, + path_provider, + ) + .await; + + let files = writer.write_all(source, None, &write_opts).await.unwrap(); + assert_eq!(2, files.len()); + + let mut rows_read = 0; + for f in &files { + let file_handle = sst_file_handle_with_file_id( + f.file_id, + f.time_range.0.value(), + f.time_range.1.value(), + ); + let builder = + ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone()); + let mut reader = builder.build().await.unwrap(); + while let Some(batch) = reader.next_batch().await.unwrap() { + rows_read += batch.num_rows(); + } + } + assert_eq!(total_rows, rows_read); + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 55189bc9e7a0..52764e21c85e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -148,6 +148,13 @@ where // At least one row has been written. assert!(stats.num_rows > 0); + debug!( + "Finishing current file {}, file size: {}, num rows: {}", + self.current_file, + self.bytes_written.load(Ordering::Relaxed), + stats.num_rows + ); + // Finish indexer and writer. // safety: writer and index can only be both present or not. let index_output = self.current_indexer.as_mut().unwrap().finish().await; @@ -201,13 +208,9 @@ where stats.update(&batch); // safety: self.current_indexer must be set when first batch has been written. self.current_indexer.as_mut().unwrap().update(&batch).await; - if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size { - debug!( - "Finishing current file {}, file size: {}, max file size: {}", - self.current_file, - self.bytes_written.load(Ordering::Relaxed), - opts.max_file_size - ); + if let Some(max_file_size) = opts.max_file_size + && self.bytes_written.load(Ordering::Relaxed) > max_file_size + { self.finish_current_file(&mut results, &mut stats).await?; } }