diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 16d1480a61ed..f82c15858b4b 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; use object_store::ObjectStore; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; @@ -30,13 +32,15 @@ use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; pub type AccessLayerRef = Arc; +/// SST write results. +pub type SstInfoArray = SmallVec<[SstInfo; 2]>; /// A layer to access SST files under the same directory. pub struct AccessLayer { @@ -121,11 +125,8 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - ) -> Result> { - let file_path = location::sst_file_path(&self.region_dir, request.file_id); - let index_file_path = location::index_file_path(&self.region_dir, request.file_id); + ) -> Result { let region_id = request.metadata.region_id; - let file_id = request.file_id; let cache_manager = request.cache_manager.clone(); let sst_info = if let Some(write_cache) = cache_manager.write_cache() { @@ -134,8 +135,9 @@ impl AccessLayer { .write_and_upload_sst( request, SstUploadRequest { - upload_path: file_path, - index_upload_path: index_file_path, + dest_path_provider: RegionFilePathProvider { + region_dir: self.region_dir.clone(), + }, remote_store: self.object_store.clone(), }, write_opts, @@ -144,11 +146,9 @@ impl AccessLayer { } else { // Write cache is disabled. let store = self.object_store.clone(); - let indexer = IndexerBuilder { + let indexer_builder = IndexerBuilderImpl { op_type: request.op_type, - file_id, - file_path: index_file_path, - metadata: &request.metadata, + metadata: request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -156,24 +156,31 @@ impl AccessLayer { inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, - } - .build() - .await; + }; let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), - file_path, request.metadata, - indexer, - ); + indexer_builder, + RegionFilePathProvider { + region_dir: self.region_dir.clone(), + }, + ) + .await; writer .write_all(request.source, request.max_sequence, write_opts) .await? }; // Put parquet metadata to cache manager. - if let Some(sst_info) = &sst_info { - if let Some(parquet_metadata) = &sst_info.file_metadata { - cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone()) + if !sst_info.is_empty() { + for sst in &sst_info { + if let Some(parquet_metadata) = &sst.file_metadata { + cache_manager.put_parquet_meta_data( + region_id, + sst.file_id, + parquet_metadata.clone(), + ) + } } } @@ -191,7 +198,6 @@ pub(crate) enum OperationType { /// Contents to build a SST. pub(crate) struct SstWriteRequest { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, @@ -229,3 +235,47 @@ async fn clean_dir(dir: &str) -> Result<()> { Ok(()) } + +/// Path provider for SST file and index file. +pub trait FilePathProvider: Send + Sync { + /// Creates index file path of given file id. + fn build_index_file_path(&self, file_id: FileId) -> String; + + /// Creates SST file path of given file id. + fn build_sst_file_path(&self, file_id: FileId) -> String; +} + +/// Path provider that builds paths in local write cache. +#[derive(Clone)] +pub(crate) struct WriteCachePathProvider { + pub(crate) region_id: RegionId, + pub(crate) file_cache: FileCacheRef, +} + +impl FilePathProvider for WriteCachePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + self.file_cache.cache_file_path(puffin_key) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet); + self.file_cache.cache_file_path(parquet_file_key) + } +} + +/// Path provider that builds paths in region storage path. +#[derive(Clone, Debug)] +pub(crate) struct RegionFilePathProvider { + pub(crate) region_dir: String, +} + +impl FilePathProvider for RegionFilePathProvider { + fn build_index_file_path(&self, file_id: FileId) -> String { + location::index_file_path(&self.region_dir, file_id) + } + + fn build_sst_file_path(&self, file_id: FileId) -> String { + location::sst_file_path(&self.region_dir, file_id) + } +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 1e9dfb540093..a0f068399ff7 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,7 +23,10 @@ use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; -use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; +use crate::access_layer::{ + new_fs_cache_store, FilePathProvider, RegionFilePathProvider, SstInfoArray, SstWriteRequest, + WriteCachePathProvider, +}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{ @@ -32,9 +35,9 @@ use crate::metrics::{ }; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::index::IndexerBuilder; +use crate::sst::index::IndexerBuilderImpl; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::parquet::WriteOptions; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// A cache for uploading files to remote object stores. @@ -103,22 +106,21 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - ) -> Result> { + ) -> Result { let timer = FLUSH_ELAPSED .with_label_values(&["write_sst"]) .start_timer(); let region_id = write_request.metadata.region_id; - let file_id = write_request.file_id; - let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); let store = self.file_cache.local_store(); - let indexer = IndexerBuilder { + let path_provider = WriteCachePathProvider { + file_cache: self.file_cache.clone(), + region_id, + }; + let indexer = IndexerBuilderImpl { op_type: write_request.op_type, - file_id, - file_path: self.file_cache.cache_file_path(puffin_key), - metadata: &write_request.metadata, + metadata: write_request.metadata.clone(), row_group_size: write_opts.row_group_size, puffin_manager: self.puffin_manager_factory.build(store), intermediate_manager: self.intermediate_manager.clone(), @@ -126,17 +128,16 @@ impl WriteCache { inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, bloom_filter_index_config: write_request.bloom_filter_index_config, - } - .build() - .await; + }; // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( self.file_cache.local_store(), - self.file_cache.cache_file_path(parquet_key), write_request.metadata, indexer, - ); + path_provider, + ) + .await; let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) @@ -145,22 +146,29 @@ impl WriteCache { timer.stop_and_record(); // Upload sst file to remote object store. - let Some(sst_info) = sst_info else { - // No data need to upload. - return Ok(None); - }; + if sst_info.is_empty() { + return Ok(sst_info); + } - let parquet_path = &upload_request.upload_path; let remote_store = &upload_request.remote_store; - self.upload(parquet_key, parquet_path, remote_store).await?; - - if sst_info.index_metadata.file_size > 0 { - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let puffin_path = &upload_request.index_upload_path; - self.upload(puffin_key, puffin_path, remote_store).await?; + for sst in &sst_info { + let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet); + let parquet_path = upload_request + .dest_path_provider + .build_sst_file_path(sst.file_id); + self.upload(parquet_key, &parquet_path, remote_store) + .await?; + + if sst.index_metadata.file_size > 0 { + let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); + let puffin_path = &upload_request + .dest_path_provider + .build_index_file_path(sst.file_id); + self.upload(puffin_key, puffin_path, remote_store).await?; + } } - Ok(Some(sst_info)) + Ok(sst_info) } /// Removes a file from the cache by `index_key`. @@ -319,10 +327,8 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { - /// Path to upload the file. - pub upload_path: String, - /// Path to upload the index file. - pub index_upload_path: String, + /// Destination path provider of which SST files in write cache should be uploaded to. + pub dest_path_provider: RegionFilePathProvider, /// Remote object store to upload. pub remote_store: ObjectStore, } @@ -336,11 +342,9 @@ mod tests { use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; use crate::region::options::IndexOptions; - use crate::sst::file::FileId; - use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, + assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata, }; use crate::test_util::TestEnv; @@ -351,9 +355,9 @@ mod tests { // and now just use local file system to mock. let mut env = TestEnv::new(); let mock_store = env.init_object_store_manager(); - let file_id = FileId::random(); - let upload_path = sst_file_path("test", file_id); - let index_upload_path = index_file_path("test", file_id); + let path_provider = RegionFilePathProvider { + region_dir: "test".to_string(), + }; let local_dir = create_temp_dir(""); let local_store = new_fs_store(local_dir.path().to_str().unwrap()); @@ -373,7 +377,6 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -386,8 +389,7 @@ mod tests { }; let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: path_provider.clone(), remote_store: mock_store.clone(), }; @@ -397,18 +399,22 @@ mod tests { }; // Write to cache and upload sst to mock remote store - write_cache + let sst_info = write_cache .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); //todo(hl): we assume it only creates one file. + + let file_id = sst_info.file_id; + let sst_upload_path = path_provider.build_sst_file_path(file_id); + let index_upload_path = path_provider.build_index_file_path(file_id); // Check write cache contains the key let key = IndexKey::new(region_id, file_id, FileType::Parquet); assert!(write_cache.file_cache.contains_key(&key)); // Check file data - let remote_data = mock_store.read(&upload_path).await.unwrap(); + let remote_data = mock_store.read(&sst_upload_path).await.unwrap(); let cache_data = local_store .read(&write_cache.file_cache.cache_file_path(key)) .await @@ -436,6 +442,7 @@ mod tests { #[tokio::test] async fn test_read_metadata_from_write_cache() { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new(); let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); @@ -456,8 +463,7 @@ mod tests { // Create source let metadata = Arc::new(sst_region_metadata()); - let handle = sst_file_handle(0, 1000); - let file_id = handle.file_id(); + let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), new_batch_by_range(&["b", "f"], 0, 40), @@ -467,7 +473,6 @@ mod tests { // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { op_type: OperationType::Flush, - file_id, metadata, source, storage: None, @@ -482,11 +487,10 @@ mod tests { row_group_size: 512, ..Default::default() }; - let upload_path = sst_file_path(&data_home, file_id); - let index_upload_path = index_file_path(&data_home, file_id); let upload_request = SstUploadRequest { - upload_path: upload_path.clone(), - index_upload_path: index_upload_path.clone(), + dest_path_provider: RegionFilePathProvider { + region_dir: data_home.clone(), + }, remote_store: mock_store.clone(), }; @@ -494,10 +498,11 @@ mod tests { .write_and_upload_sst(write_request, upload_request, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let write_parquet_metadata = sst_info.file_metadata.unwrap(); // Read metadata from write cache + let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000); let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) .cache(CacheStrategy::EnableAll(cache_manager.clone())); let reader = builder.build().await.unwrap(); diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bf8df5fcec7a..21237a340c04 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -66,7 +66,7 @@ use crate::schedule::remote_job_scheduler::{ CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, }; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::file::{FileHandle, FileMeta, Level}; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -548,7 +548,6 @@ impl CompactionStatus { #[derive(Debug, Clone)] pub struct CompactionOutput { - pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, /// Compaction input files. @@ -562,7 +561,6 @@ pub struct CompactionOutput { /// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SerializedCompactionOutput { - output_file_id: FileId, output_level: Level, inputs: Vec, filter_deleted: bool, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ceeb509bc17e..a3929fb9c227 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -271,7 +271,7 @@ impl Compactor for DefaultCompactor { compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); info!( - "Compaction region {} output [{}]-> {}", + "Region {} compaction input: [{}]", compaction_region.region_id, output .inputs @@ -279,18 +279,17 @@ impl Compactor for DefaultCompactor { .map(|f| f.file_id().to_string()) .collect::>() .join(","), - output.output_file_id ); let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, + max_file_size: picker_output.max_file_size, ..Default::default() }; let region_metadata = compaction_region.region_metadata.clone(); let sst_layer = compaction_region.access_layer.clone(); let region_id = compaction_region.region_id; - let file_id = output.output_file_id; let cache_manager = compaction_region.cache_manager.clone(); let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -327,7 +326,6 @@ impl Compactor for DefaultCompactor { .write_sst( SstWriteRequest { op_type: OperationType::Compact, - file_id, metadata: region_metadata, source: Source::Reader(reader), cache_manager, @@ -341,9 +339,10 @@ impl Compactor for DefaultCompactor { &write_opts, ) .await? + .into_iter() .map(|sst_info| FileMeta { region_id, - file_id, + file_id: sst_info.file_id, time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, @@ -352,7 +351,8 @@ impl Compactor for DefaultCompactor { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, - }); + }) + .collect::>(); Ok(file_meta_opt) }); } @@ -369,7 +369,7 @@ impl Compactor for DefaultCompactor { .await .context(JoinSnafu)? .into_iter() - .collect::>>()?; + .collect::>>>()?; output_files.extend(metas.into_iter().flatten()); } diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs index 9397c2bf6470..0ceafefcd861 100644 --- a/src/mito2/src/compaction/picker.rs +++ b/src/mito2/src/compaction/picker.rs @@ -45,6 +45,8 @@ pub struct PickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + /// Max single output file size in bytes. + pub max_file_size: Option, } /// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta]. @@ -53,6 +55,7 @@ pub struct SerializedPickerOutput { pub outputs: Vec, pub expired_ssts: Vec, pub time_window_size: i64, + pub max_file_size: Option, } impl From<&PickerOutput> for SerializedPickerOutput { @@ -61,7 +64,6 @@ impl From<&PickerOutput> for SerializedPickerOutput { .outputs .iter() .map(|output| SerializedCompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output.inputs.iter().map(|s| s.meta_ref().clone()).collect(), filter_deleted: output.filter_deleted, @@ -77,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -91,7 +94,6 @@ impl PickerOutput { .outputs .into_iter() .map(|output| CompactionOutput { - output_file_id: output.output_file_id, output_level: output.output_level, inputs: output .inputs @@ -113,6 +115,7 @@ impl PickerOutput { outputs, expired_ssts, time_window_size: input.time_window_size, + max_file_size: input.max_file_size, } } } @@ -167,14 +170,12 @@ mod tests { let picker_output = PickerOutput { outputs: vec![ CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, output_time_range: None, }, CompactionOutput { - output_file_id: FileId::random(), output_level: 0, inputs: inputs_file_handle.clone(), filter_deleted: false, @@ -183,6 +184,7 @@ mod tests { ], expired_ssts: expired_ssts_file_handle.clone(), time_window_size: 1000, + max_file_size: None, }; let picker_output_str = @@ -205,7 +207,6 @@ mod tests { .iter() .zip(picker_output_from_serialized.outputs.iter()) .for_each(|(expected, actual)| { - assert_eq!(expected.output_file_id, actual.output_file_id); assert_eq!(expected.output_level, actual.output_level); expected .inputs diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8efaa6c65fb6..dddfb7934905 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -26,7 +26,7 @@ use crate::compaction::compactor::CompactionRegion; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::run::{find_sorted_runs, reduce_runs, Item}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{overlaps, FileHandle, FileId, Level}; +use crate::sst::file::{overlaps, FileHandle, Level}; use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; @@ -118,23 +118,9 @@ impl TwcsPicker { continue; }; - let split_inputs = if !filter_deleted - && let Some(max_output_file_size) = self.max_output_file_size - { - let len_before_split = inputs.len(); - let maybe_split = enforce_max_output_size(inputs, max_output_file_size); - if maybe_split.len() != len_before_split { - info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split); - } - maybe_split - } else { - inputs - }; - - for input in split_inputs { + for input in inputs { debug_assert!(input.len() > 1); output.push(CompactionOutput { - output_file_id: FileId::random(), output_level: LEVEL_COMPACTED, // always compact to l1 inputs: input, filter_deleted, @@ -146,43 +132,6 @@ impl TwcsPicker { } } -/// Limits the size of compaction output in a naive manner. -/// todo(hl): we can find the output file size more precisely by checking the time range -/// of each row group and adding the sizes of those non-overlapping row groups. But now -/// we'd better not to expose the SST details in this level. -fn enforce_max_output_size( - inputs: Vec>, - max_output_file_size: u64, -) -> Vec> { - inputs - .into_iter() - .flat_map(|input| { - debug_assert!(input.len() > 1); - let estimated_output_size = input.iter().map(|f| f.size()).sum::(); - if estimated_output_size < max_output_file_size { - // total file size does not exceed the threshold, just return the original input. - return vec![input]; - } - let mut splits = vec![]; - let mut new_input = vec![]; - let mut new_input_size = 0; - for f in input { - if new_input_size + f.size() > max_output_file_size { - splits.push(std::mem::take(&mut new_input)); - new_input_size = 0; - } - new_input_size += f.size(); - new_input.push(f); - } - if !new_input.is_empty() { - splits.push(new_input); - } - splits - }) - .filter(|p| p.len() > 1) - .collect() -} - /// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses /// the solution with minimum overhead according to files sizes to be merged. /// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs. @@ -247,10 +196,12 @@ impl Picker for TwcsPicker { return None; } + let max_file_size = self.max_output_file_size.map(|v| v as usize); Some(PickerOutput { outputs, expired_ssts, time_window_size, + max_file_size, }) } } @@ -369,12 +320,10 @@ fn find_latest_window_in_seconds<'a>( #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::Arc; use super::*; use crate::compaction::test_util::{new_file_handle, new_file_handles}; - use crate::sst::file::{FileMeta, Level}; - use crate::test_util::NoopFilePurger; + use crate::sst::file::{FileId, Level}; #[test] fn test_get_latest_window_in_seconds() { @@ -742,44 +691,5 @@ mod tests { .check(); } - fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec { - inputs - .iter() - .map(|(start, end, size)| { - FileHandle::new( - FileMeta { - region_id: Default::default(), - file_id: Default::default(), - time_range: ( - Timestamp::new_millisecond(*start), - Timestamp::new_millisecond(*end), - ), - level: 0, - file_size: *size, - available_indexes: Default::default(), - index_file_size: 0, - num_rows: 0, - num_row_groups: 0, - sequence: None, - }, - Arc::new(NoopFilePurger), - ) - }) - .collect() - } - - #[test] - fn test_limit_output_size() { - let mut files = make_file_handles(&[(1, 1, 1)].repeat(6)); - let runs = find_sorted_runs(&mut files); - assert_eq!(6, runs.len()); - let files_to_merge = reduce_runs(runs, 2); - - let enforced = enforce_max_output_size(files_to_merge, 2); - assert_eq!(2, enforced.len()); - assert_eq!(2, enforced[0].len()); - assert_eq!(2, enforced[1].len()); - } - // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 10bdb47297d5..06212cb6d513 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -26,7 +26,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::compactor::{CompactionRegion, CompactionVersion}; use crate::compaction::picker::{Picker, PickerOutput}; use crate::compaction::{get_expired_ssts, CompactionOutput}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::FileHandle; /// Compaction picker that splits the time range of all involved files to windows, and merges /// the data segments intersects with those windows of files together so that the output files @@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker { outputs, expired_ssts, time_window_size: time_window, + max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction. }) } } @@ -132,7 +133,6 @@ fn build_output(windows: BTreeMap)>) -> Vec, inverted_indexer: Option, last_mem_inverted_index: usize, @@ -168,11 +167,15 @@ impl Indexer { } } -pub(crate) struct IndexerBuilder<'a> { +#[async_trait::async_trait] +pub trait IndexerBuilder { + /// Builds indexer of given file id to [index_file_path]. + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer; +} + +pub(crate) struct IndexerBuilderImpl { pub(crate) op_type: OperationType, - pub(crate) file_id: FileId, - pub(crate) file_path: String, - pub(crate) metadata: &'a RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, pub(crate) row_group_size: usize, pub(crate) puffin_manager: SstPuffinManager, pub(crate) intermediate_manager: IntermediateManager, @@ -182,20 +185,20 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) bloom_filter_index_config: BloomFilterConfig, } -impl<'a> IndexerBuilder<'a> { +#[async_trait::async_trait] +impl IndexerBuilder for IndexerBuilderImpl { /// Sanity check for arguments and create a new [Indexer] if arguments are valid. - pub(crate) async fn build(self) -> Indexer { + async fn build(&self, file_id: FileId, index_file_path: String) -> Indexer { let mut indexer = Indexer { - file_id: self.file_id, - file_path: self.file_path.clone(), + file_id, + file_path: index_file_path, region_id: self.metadata.region_id, - ..Default::default() }; - indexer.inverted_indexer = self.build_inverted_indexer(); - indexer.fulltext_indexer = self.build_fulltext_indexer().await; - indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + indexer.inverted_indexer = self.build_inverted_indexer(file_id); + indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await; + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id); if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() && indexer.bloom_filter_indexer.is_none() @@ -204,11 +207,13 @@ impl<'a> IndexerBuilder<'a> { return Indexer::default(); } - indexer.puffin_manager = Some(self.puffin_manager); + indexer.puffin_manager = Some(self.puffin_manager.clone()); indexer } +} - fn build_inverted_indexer(&self) -> Option { +impl IndexerBuilderImpl { + fn build_inverted_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.inverted_index_config.create_on_flush.auto(), OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(), @@ -217,7 +222,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating inverted index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -225,7 +230,7 @@ impl<'a> IndexerBuilder<'a> { if self.metadata.primary_key.is_empty() { debug!( "No tag columns, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -235,7 +240,7 @@ impl<'a> IndexerBuilder<'a> { else { warn!( "Segment row count is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -243,7 +248,7 @@ impl<'a> IndexerBuilder<'a> { let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { warn!( "Row group size is 0, skip creating index, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; }; @@ -254,8 +259,8 @@ impl<'a> IndexerBuilder<'a> { } let indexer = InvertedIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), self.inverted_index_config.mem_threshold_on_create(), segment_row_count, @@ -267,7 +272,7 @@ impl<'a> IndexerBuilder<'a> { Some(indexer) } - async fn build_fulltext_indexer(&self) -> Option { + async fn build_fulltext_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), @@ -276,7 +281,7 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating full-text index due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } @@ -284,9 +289,9 @@ impl<'a> IndexerBuilder<'a> { let mem_limit = self.fulltext_index_config.mem_threshold_on_create(); let creator = FulltextIndexer::new( &self.metadata.region_id, - &self.file_id, + &file_id, &self.intermediate_manager, - self.metadata, + &self.metadata, self.fulltext_index_config.compress, mem_limit, ) @@ -297,7 +302,7 @@ impl<'a> IndexerBuilder<'a> { if creator.is_none() { debug!( "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return creator; @@ -308,19 +313,19 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create full-text indexer, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } None } - fn build_bloom_filter_indexer(&self) -> Option { + fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option { let create = match self.op_type { OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), @@ -329,15 +334,15 @@ impl<'a> IndexerBuilder<'a> { if !create { debug!( "Skip creating bloom filter due to config, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); return None; } let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( - self.file_id, - self.metadata, + file_id, + &self.metadata, self.intermediate_manager.clone(), mem_limit, ); @@ -347,7 +352,7 @@ impl<'a> IndexerBuilder<'a> { if indexer.is_none() { debug!( "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } return indexer; @@ -358,12 +363,12 @@ impl<'a> IndexerBuilder<'a> { if cfg!(any(test, feature = "test")) { panic!( "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}", - self.metadata.region_id, self.file_id, err + self.metadata.region_id, file_id, err ); } else { warn!( err; "Failed to create bloom filter, region_id: {}, file_id: {}", - self.metadata.region_id, self.file_id, + self.metadata.region_id, file_id, ); } @@ -489,11 +494,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -502,7 +505,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -521,11 +524,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -537,18 +538,16 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -560,18 +559,16 @@ mod tests { }, bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); assert!(indexer.bloom_filter_indexer.is_some()); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Compact, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -583,7 +580,7 @@ mod tests { ..Default::default() }, } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -602,11 +599,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -615,7 +610,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); @@ -627,11 +622,9 @@ mod tests { with_fulltext: false, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager.clone(), @@ -640,7 +633,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -652,11 +645,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: false, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata: metadata.clone(), row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -665,7 +656,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_some()); @@ -684,11 +675,9 @@ mod tests { with_fulltext: true, with_skipping_bloom: true, }); - let indexer = IndexerBuilder { + let indexer = IndexerBuilderImpl { op_type: OperationType::Flush, - file_id: FileId::random(), - file_path: "test".to_string(), - metadata: &metadata, + metadata, row_group_size: 0, puffin_manager: factory.build(mock_object_store()), intermediate_manager: intm_manager, @@ -697,7 +686,7 @@ mod tests { fulltext_index_config: FulltextIndexConfig::default(), bloom_filter_index_config: BloomFilterConfig::default(), } - .build() + .build(FileId::random(), "test".to_string()) .await; assert!(indexer.inverted_indexer.is_none()); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 05dafb0edfc3..2df6ee70f863 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; -use crate::sst::file::FileTimeRange; +use crate::sst::file::{FileId, FileTimeRange}; use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -49,6 +49,10 @@ pub struct WriteOptions { pub write_buffer_size: ReadableSize, /// Row group size. pub row_group_size: usize, + /// Max single output file size. + /// Note: This is not a hard limit as we can only observe the file size when + /// ArrowWrite writes to underlying writers. + pub max_file_size: Option, } impl Default for WriteOptions { @@ -56,12 +60,15 @@ impl Default for WriteOptions { WriteOptions { write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, row_group_size: DEFAULT_ROW_GROUP_SIZE, + max_file_size: None, } } } /// Parquet SST info returned by the writer. pub struct SstInfo { + /// SST file id. + pub file_id: FileId, /// Time range of the SST. The timestamps have the same time unit as the /// data in the SST. pub time_range: FileTimeRange, @@ -95,26 +102,55 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; + use crate::access_layer::{FilePathProvider, RegionFilePathProvider}; use crate::cache::{CacheManager, CacheStrategy, PageKey}; - use crate::sst::index::Indexer; + use crate::read::BatchReader; + use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; - use crate::sst::DEFAULT_WRITE_CONCURRENCY; + use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id, + sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; const FILE_DIR: &str = "/"; + #[derive(Clone)] + struct FixedPathProvider { + file_id: FileId, + } + + impl FilePathProvider for FixedPathProvider { + fn build_index_file_path(&self, _file_id: FileId) -> String { + location::index_file_path(FILE_DIR, self.file_id) + } + + fn build_sst_file_path(&self, _file_id: FileId) -> String { + location::sst_file_path(FILE_DIR, self.file_id) + } + } + + struct NoopIndexBuilder; + + #[async_trait::async_trait] + impl IndexerBuilder for NoopIndexBuilder { + async fn build(&self, _file_id: FileId, _path: String) -> Indexer { + Indexer::default() + } + } + #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); + let file_path = FixedPathProvider { + file_id: handle.file_id(), + }; let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -126,18 +162,20 @@ mod tests { row_group_size: 50, ..Default::default() }; + let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), + metadata.clone(), + NoopIndexBuilder, file_path, - metadata, - Indexer::default(), - ); + ) + .await; let info = writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); assert_eq!( @@ -168,7 +206,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -183,16 +220,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Enable page cache. let cache = CacheStrategy::EnableAll(Arc::new( @@ -236,7 +276,6 @@ mod tests { let mut env = crate::test_util::TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -252,16 +291,19 @@ mod tests { // sst info contains the parquet metadata, which is converted from FileMetaData let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; let sst_info = writer .write_all(source, None, &write_opts) .await .unwrap() - .expect("write_all should return sst info"); + .remove(0); let writer_metadata = sst_info.file_metadata.unwrap(); // read the sst file metadata @@ -277,7 +319,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -292,15 +333,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { @@ -330,7 +374,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "z"], 0, 0), @@ -345,15 +388,18 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); @@ -365,7 +411,6 @@ mod tests { let mut env = TestEnv::new(); let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); - let file_path = handle.file_path(FILE_DIR); let metadata = Arc::new(sst_region_metadata()); let source = new_source(&[ new_batch_by_range(&["a", "d"], 0, 60), @@ -380,16 +425,19 @@ mod tests { // Prepare data. let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), - file_path, metadata.clone(), - Indexer::default(), - ); + NoopIndexBuilder, + FixedPathProvider { + file_id: handle.file_id(), + }, + ) + .await; writer .write_all(source, None, &write_opts) .await .unwrap() - .unwrap(); + .remove(0); // Predicate let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { @@ -495,4 +543,58 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_write_multiple_files() { + common_telemetry::init_default_ut_logging(); + // create test env + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let metadata = Arc::new(sst_region_metadata()); + let batches = &[ + new_batch_by_range(&["a", "d"], 0, 1000), + new_batch_by_range(&["b", "f"], 0, 1000), + new_batch_by_range(&["b", "h"], 100, 200), + new_batch_by_range(&["b", "h"], 200, 300), + new_batch_by_range(&["b", "h"], 300, 1000), + ]; + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + + let source = new_source(batches); + let write_opts = WriteOptions { + row_group_size: 50, + max_file_size: Some(1024 * 16), + ..Default::default() + }; + + let path_provider = RegionFilePathProvider { + region_dir: "test".to_string(), + }; + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + NoopIndexBuilder, + path_provider, + ) + .await; + + let files = writer.write_all(source, None, &write_opts).await.unwrap(); + assert_eq!(2, files.len()); + + let mut rows_read = 0; + for f in &files { + let file_handle = sst_file_handle_with_file_id( + f.file_id, + f.time_range.0.value(), + f.time_range.1.value(), + ); + let builder = + ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone()); + let mut reader = builder.build().await.unwrap(); + while let Some(batch) = reader.next_batch().await.unwrap() { + rows_read += batch.num_rows(); + } + } + assert_eq!(total_rows, rows_read); + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13f7cfb3ec91..52764e21c85e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,11 +15,13 @@ //! Parquet writer. use std::future::Future; +use std::mem; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::debug; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; use object_store::{FuturesAsyncWriter, ObjectStore}; @@ -28,6 +30,7 @@ use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::schema::types::ColumnPath; +use smallvec::smallvec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; @@ -35,40 +38,48 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; +use crate::access_layer::{FilePathProvider, SstInfoArray}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; -use crate::sst::index::Indexer; +use crate::sst::file::FileId; +use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; /// Parquet SST writer. -pub struct ParquetWriter { +pub struct ParquetWriter { + /// Path provider that creates SST and index file paths according to file id. + path_provider: P, writer: Option>>, + /// Current active file id. + current_file: FileId, writer_factory: F, /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, - indexer: Indexer, + /// Indexer build that can create indexer for multiple files. + indexer_builder: I, + /// Current active indexer. + current_indexer: Option, bytes_written: Arc, } pub trait WriterFactory { type Writer: AsyncWrite + Send + Unpin; - fn create(&mut self) -> impl Future>; + fn create(&mut self, file_path: &str) -> impl Future>; } pub struct ObjectStoreWriterFactory { - path: String, object_store: ObjectStore, } impl WriterFactory for ObjectStoreWriterFactory { type Writer = Compat; - async fn create(&mut self) -> Result { + async fn create(&mut self, file_path: &str) -> Result { self.object_store - .writer_with(&self.path) + .writer_with(file_path) .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .concurrent(DEFAULT_WRITE_CONCURRENCY) .await @@ -77,36 +88,102 @@ impl WriterFactory for ObjectStoreWriterFactory { } } -impl ParquetWriter { - pub fn new_with_object_store( +impl ParquetWriter +where + P: FilePathProvider, + I: IndexerBuilder, +{ + pub async fn new_with_object_store( object_store: ObjectStore, - path: String, metadata: RegionMetadataRef, - indexer: Indexer, - ) -> ParquetWriter { + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { ParquetWriter::new( - ObjectStoreWriterFactory { path, object_store }, + ObjectStoreWriterFactory { object_store }, metadata, - indexer, + indexer_builder, + path_provider, ) + .await } } -impl ParquetWriter +impl ParquetWriter where F: WriterFactory, + I: IndexerBuilder, + P: FilePathProvider, { /// Creates a new parquet SST writer. - pub fn new(factory: F, metadata: RegionMetadataRef, indexer: Indexer) -> ParquetWriter { + pub async fn new( + factory: F, + metadata: RegionMetadataRef, + indexer_builder: I, + path_provider: P, + ) -> ParquetWriter { + let init_file = FileId::random(); + ParquetWriter { + path_provider, writer: None, + current_file: init_file, writer_factory: factory, metadata, - indexer, + indexer_builder, + current_indexer: None, bytes_written: Arc::new(AtomicUsize::new(0)), } } + /// Finishes current SST file and index file. + async fn finish_current_file( + &mut self, + ssts: &mut SstInfoArray, + stats: &mut SourceStats, + ) -> Result<()> { + // maybe_init_writer will re-create a new file. + if let Some(mut current_writer) = mem::take(&mut self.writer) { + let stats = mem::take(stats); + // At least one row has been written. + assert!(stats.num_rows > 0); + + debug!( + "Finishing current file {}, file size: {}, num rows: {}", + self.current_file, + self.bytes_written.load(Ordering::Relaxed), + stats.num_rows + ); + + // Finish indexer and writer. + // safety: writer and index can only be both present or not. + let index_output = self.current_indexer.as_mut().unwrap().finish().await; + current_writer.flush().await.context(WriteParquetSnafu)?; + + let file_meta = current_writer.close().await.context(WriteParquetSnafu)?; + let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; + + // Safety: num rows > 0 so we must have min/max. + let time_range = stats.time_range.unwrap(); + + // convert FileMetaData to ParquetMetaData + let parquet_metadata = parse_parquet_metadata(file_meta)?; + ssts.push(SstInfo { + file_id: self.current_file, + time_range, + file_size, + num_rows: stats.num_rows, + num_row_groups: parquet_metadata.num_row_groups() as u64, + file_metadata: Some(Arc::new(parquet_metadata)), + index_metadata: index_output, + }); + self.current_file = FileId::random(); + self.bytes_written.store(0, Ordering::Relaxed) + }; + + Ok(()) + } + /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. @@ -115,7 +192,8 @@ where mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, - ) -> Result> { + ) -> Result { + let mut results = smallvec![]; let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); @@ -128,46 +206,27 @@ where match res { Ok(batch) => { stats.update(&batch); - self.indexer.update(&batch).await; + // safety: self.current_indexer must be set when first batch has been written. + self.current_indexer.as_mut().unwrap().update(&batch).await; + if let Some(max_file_size) = opts.max_file_size + && self.bytes_written.load(Ordering::Relaxed) > max_file_size + { + self.finish_current_file(&mut results, &mut stats).await?; + } } Err(e) => { - self.indexer.abort().await; + if let Some(indexer) = &mut self.current_indexer { + indexer.abort().await; + } return Err(e); } } } - let index_output = self.indexer.finish().await; - - if stats.num_rows == 0 { - return Ok(None); - } - - let Some(mut arrow_writer) = self.writer.take() else { - // No batch actually written. - return Ok(None); - }; - - arrow_writer.flush().await.context(WriteParquetSnafu)?; - - let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?; - let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; - - // Safety: num rows > 0 so we must have min/max. - let time_range = stats.time_range.unwrap(); - - // convert FileMetaData to ParquetMetaData - let parquet_metadata = parse_parquet_metadata(file_meta)?; + self.finish_current_file(&mut results, &mut stats).await?; // object_store.write will make sure all bytes are written or an error is raised. - Ok(Some(SstInfo { - time_range, - file_size, - num_rows: stats.num_rows, - num_row_groups: parquet_metadata.num_row_groups() as u64, - file_metadata: Some(Arc::new(parquet_metadata)), - index_metadata: index_output, - })) + Ok(results) } /// Customizes per-column config according to schema and maybe column cardinality. @@ -229,14 +288,23 @@ where let props_builder = Self::customize_column_config(props_builder, &self.metadata); let writer_props = props_builder.build(); + let sst_file_path = self.path_provider.build_sst_file_path(self.current_file); let writer = SizeAwareWriter::new( - self.writer_factory.create().await?, + self.writer_factory.create(&sst_file_path).await?, self.bytes_written.clone(), ); let arrow_writer = AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; self.writer = Some(arrow_writer); + + let index_file_path = self.path_provider.build_index_file_path(self.current_file); + let indexer = self + .indexer_builder + .build(self.current_file, index_file_path) + .await; + self.current_indexer = Some(indexer); + // safety: self.writer is assigned above Ok(self.writer.as_mut().unwrap()) } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 63c3fc09d621..63b1d2e7a0c0 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,6 @@ //! Utilities for testing SSTs. -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::{OpType, SemanticType}; @@ -100,13 +99,13 @@ pub fn new_source(batches: &[Batch]) -> Source { Source::Reader(Box::new(reader)) } -/// Creates a new [FileHandle] for a SST. -pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { +/// Creates a SST file handle with provided file id +pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) -> FileHandle { let file_purger = new_noop_file_purger(); FileHandle::new( FileMeta { region_id: REGION_ID, - file_id: FileId::random(), + file_id, time_range: ( Timestamp::new_millisecond(start_ms), Timestamp::new_millisecond(end_ms), @@ -123,6 +122,11 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { ) } +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + sst_file_handle_with_file_id(FileId::random(), start_ms, end_ms) +} + pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 68534d34eeb8..9b98fbf026f4 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -15,7 +15,6 @@ //! Utilities to mock version. use std::collections::HashMap; -use std::num::NonZeroU64; use std::sync::Arc; use api::v1::value::ValueData; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 52190f92fb28..c03a86aaf672 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -27,6 +27,7 @@ use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. +#[derive(Clone)] pub struct FsPuffinManager { /// The stager. stager: S,