Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): default new compaction group for new table #19080

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl GlobalBarrierWorkerContextImpl {
if is_first_time {
commit_info
.new_table_fragment_infos
.push(NewTableFragmentInfo::NewCompactionGroup {
.push(NewTableFragmentInfo {
table_ids: tables_to_commit,
});
};
Expand Down Expand Up @@ -1747,14 +1747,16 @@ fn collect_commit_epoch_info(
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
let table_fragments = &info.table_fragments;
vec![NewTableFragmentInfo::Normal {
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
}]
let mut table_ids: HashSet<_> = table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect();
if let Some(mv_table_id) = table_fragments.mv_table_id() {
table_ids.insert(TableId::new(mv_table_id));
}

vec![NewTableFragmentInfo { table_ids }]
} else {
vec![]
};
Expand Down
120 changes: 44 additions & 76 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map, PbTableStatsMap,
Expand Down Expand Up @@ -49,14 +48,8 @@ use crate::hummock::{
commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager,
};

pub enum NewTableFragmentInfo {
Normal {
mv_table_id: Option<TableId>,
internal_table_ids: Vec<TableId>,
},
NewCompactionGroup {
table_ids: HashSet<TableId>,
},
pub struct NewTableFragmentInfo {
pub table_ids: HashSet<TableId>,
}

#[derive(Default)]
Expand Down Expand Up @@ -124,73 +117,48 @@ impl HummockManager {
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

// Add new table
for new_table_fragment_info in new_table_fragment_infos {
match new_table_fragment_info {
NewTableFragmentInfo::Normal {
mv_table_id,
internal_table_ids,
} => {
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

on_handle_add_new_table(
state_table_info,
&mv_table_id,
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}
NewTableFragmentInfo::NewCompactionGroup { table_ids } => {
let (compaction_group_manager, compaction_group_config) =
if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
(
compaction_group_manager,
(*compaction_group_config
.as_ref()
.expect("must be set with compaction_group_manager_txn"))
.clone(),
)
} else {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let new_compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
compaction_group_config = Some(new_compaction_group_config.clone());
(
compaction_group_manager_txn.insert(
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
),
),
new_compaction_group_config,
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups
.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}
}
for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
let (compaction_group_manager, compaction_group_config) =
if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
(
compaction_group_manager,
(*compaction_group_config
.as_ref()
.expect("must be set with compaction_group_manager_txn"))
.clone(),
)
} else {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let new_compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
compaction_group_config = Some(new_compaction_group_config.clone());
(
compaction_group_manager_txn.insert(
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
),
),
new_compaction_group_config,
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}

let commit_sstables = self
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,12 @@ impl HummockMetaClient for MockHummockMetaClient {
{
vec![]
} else {
vec![NewTableFragmentInfo::Normal {
mv_table_id: None,
internal_table_ids: commit_table_ids
vec![NewTableFragmentInfo {
table_ids: commit_table_ids
.iter()
.cloned()
.map(TableId::from)
.collect_vec(),
.collect(),
}]
};

Expand Down
51 changes: 18 additions & 33 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::range::RangeBoundsExt;
use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH};
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{
gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN,
};
Expand All @@ -36,6 +35,7 @@ use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id;
use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
Expand Down Expand Up @@ -2635,20 +2635,20 @@ async fn test_commit_multi_epoch() {
commit_epoch(
epoch1,
sst1_epoch1.clone(),
vec![NewTableFragmentInfo::Normal {
mv_table_id: None,
internal_table_ids: vec![existing_table_id],
vec![NewTableFragmentInfo {
table_ids: HashSet::from_iter([existing_table_id]),
}],
&[existing_table_id],
)
.await;

let old_cg_id_set: HashSet<_> = {
let cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id())
.await;

{
let version = test_env.manager.get_current_version().await;
let cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let cg = version.levels.get(&(cg_id)).unwrap();
let sub_levels = &cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 1);
let sub_level = &sub_levels[0];
Expand All @@ -2661,13 +2661,8 @@ async fn test_commit_multi_epoch() {
.get(&existing_table_id)
.unwrap();
assert_eq!(epoch1, info.committed_epoch);
assert_eq!(
StaticCompactionGroupId::StateDefault as u64,
info.compaction_group_id
);

version.levels.keys().cloned().collect()
};
assert_eq!(cg_id, info.compaction_group_id);
}

let sst1_epoch2 = SstableInfo {
sst_id: 22,
Expand All @@ -2684,10 +2679,7 @@ async fn test_commit_multi_epoch() {

{
let version = test_env.manager.get_current_version().await;
let cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let cg = version.levels.get(&(cg_id)).unwrap();
let sub_levels = &cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 2);
let sub_level = &sub_levels[0];
Expand All @@ -2703,10 +2695,7 @@ async fn test_commit_multi_epoch() {
.get(&existing_table_id)
.unwrap();
assert_eq!(epoch2, info.committed_epoch);
assert_eq!(
StaticCompactionGroupId::StateDefault as u64,
info.compaction_group_id
);
assert_eq!(cg_id, info.compaction_group_id);
};

let new_table_id = TableId::new(2);
Expand All @@ -2723,7 +2712,7 @@ async fn test_commit_multi_epoch() {
commit_epoch(
epoch1,
sst2_epoch1.clone(),
vec![NewTableFragmentInfo::NewCompactionGroup {
vec![NewTableFragmentInfo {
table_ids: HashSet::from_iter([new_table_id]),
}],
&[new_table_id],
Expand All @@ -2732,10 +2721,9 @@ async fn test_commit_multi_epoch() {

let new_cg_id = {
let version = test_env.manager.get_current_version().await;
let new_cg_id_set: HashSet<_> = version.levels.keys().cloned().collect();
let added_cg_id_set = &new_cg_id_set - &old_cg_id_set;
assert_eq!(added_cg_id_set.len(), 1);
let new_cg_id = added_cg_id_set.into_iter().next().unwrap();
let new_cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), new_table_id.table_id())
.await;

let new_cg = version.levels.get(&new_cg_id).unwrap();
let sub_levels = &new_cg.l0.sub_levels;
Expand Down Expand Up @@ -2801,10 +2789,7 @@ async fn test_commit_multi_epoch() {

{
let version = test_env.manager.get_current_version().await;
let old_cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let old_cg = version.levels.get(&cg_id).unwrap();
let sub_levels = &old_cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 3);
let sub_level1 = &sub_levels[0];
Expand Down
25 changes: 16 additions & 9 deletions src/tests/simulation/tests/integration_tests/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,25 @@ async fn test_vnode_watermark_reclaim_impl(
.parse::<u64>()
.unwrap();

// Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables.
cluster.split_compaction_group(2, table_id).await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
let compaction_group_id = session
.run(format!(
"SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;",
table_id
))
async fn compaction_group_id_by_table_id(session: &mut Session, table_id: u64) -> u64 {
session
.run(format!(
"SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;",
table_id
))
.await
.unwrap()
.parse::<u64>()
.unwrap()
}
let original_compaction_group_id = compaction_group_id_by_table_id(session, table_id).await;
// Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables.
cluster
.split_compaction_group(original_compaction_group_id, table_id)
.await
.unwrap()
.parse::<u64>()
.unwrap();
let compaction_group_id = compaction_group_id_by_table_id(session, table_id).await;

session
.run("INSERT INTO t2 VALUES (now(), 1);")
Expand Down
Loading