Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support base manual compaction for cloud table (backport #52461) #52637

Open
wants to merge 1 commit into
base: branch-3.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions be/src/storage/lake/compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ namespace starrocks::lake {
class BaseAndCumulativeCompactionPolicy : public CompactionPolicy {
public:
explicit BaseAndCumulativeCompactionPolicy(TabletManager* tablet_mgr,
std::shared_ptr<const TabletMetadataPB> tablet_metadata)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata)) {}
std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata), force_base_compaction) {}

~BaseAndCumulativeCompactionPolicy() override = default;

Expand All @@ -55,16 +56,16 @@ struct SizeTieredLevel {
class SizeTieredCompactionPolicy : public CompactionPolicy {
public:
explicit SizeTieredCompactionPolicy(TabletManager* tablet_mgr,
std::shared_ptr<const TabletMetadataPB> tablet_metadata)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata)),
_max_level_size(config::size_tiered_min_level_size *
pow(config::size_tiered_level_multiple, config::size_tiered_level_num)) {}
std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata), force_base_compaction) {}

~SizeTieredCompactionPolicy() override = default;

StatusOr<std::vector<RowsetPtr>> pick_rowsets() override;

static StatusOr<std::unique_ptr<SizeTieredLevel>> pick_max_level(const TabletMetadataPB& metadata);
static StatusOr<std::unique_ptr<SizeTieredLevel>> pick_max_level(const TabletMetadataPB& metadata,
bool force_base_compaction);

private:
static double cal_compaction_score(int64_t segment_num, int64_t level_size, int64_t total_size,
Expand All @@ -75,13 +76,11 @@ class SizeTieredCompactionPolicy : public CompactionPolicy {
return left->score > right->score || (left->score == right->score && left->rowsets[0] > right->rowsets[0]);
}
};

int64_t _max_level_size;
};

StatusOr<uint32_t> primary_compaction_score_by_policy(TabletManager* tablet_mgr,
const std::shared_ptr<const TabletMetadataPB>& metadata) {
PrimaryCompactionPolicy policy(tablet_mgr, metadata);
PrimaryCompactionPolicy policy(tablet_mgr, metadata, false /* force_base_compaction */);
std::vector<bool> has_dels;
ASSIGN_OR_RETURN(auto pick_rowset_indexes, policy.pick_rowset_indexes(metadata, true, &has_dels));
uint32_t segment_num_score = 0;
Expand Down Expand Up @@ -211,7 +210,7 @@ StatusOr<std::vector<RowsetPtr>> BaseAndCumulativeCompactionPolicy::pick_rowsets
DCHECK(_tablet_metadata != nullptr) << "_tablet_metadata is null";
double cumulative_score = cumulative_compaction_score(_tablet_metadata);
double base_score = base_compaction_score(_tablet_metadata);
if (base_score > cumulative_score) {
if (base_score > cumulative_score || _force_base_compaction) {
return pick_base_rowsets();
} else {
return pick_cumulative_rowsets();
Expand Down Expand Up @@ -254,8 +253,8 @@ double SizeTieredCompactionPolicy::cal_compaction_score(int64_t segment_num, int
return score;
}

StatusOr<std::unique_ptr<SizeTieredLevel>> SizeTieredCompactionPolicy::pick_max_level(
const TabletMetadataPB& metadata) {
StatusOr<std::unique_ptr<SizeTieredLevel>> SizeTieredCompactionPolicy::pick_max_level(const TabletMetadataPB& metadata,
bool force_base_compaction) {
int64_t max_level_size =
config::size_tiered_min_level_size * pow(config::size_tiered_level_multiple, config::size_tiered_level_num);
const auto& rowsets = metadata.rowsets();
Expand All @@ -271,7 +270,7 @@ StatusOr<std::unique_ptr<SizeTieredLevel>> SizeTieredCompactionPolicy::pick_max_
++num_delete_rowsets;
}
}
bool force_base_compaction = (num_delete_rowsets >= config::tablet_max_versions / 10);
force_base_compaction = force_base_compaction || (num_delete_rowsets >= config::tablet_max_versions / 10);

// check reach max version
bool reached_max_version = (rowsets.size() > config::tablet_max_versions / 10 * 9);
Expand Down Expand Up @@ -383,14 +382,17 @@ StatusOr<std::unique_ptr<SizeTieredLevel>> SizeTieredCompactionPolicy::pick_max_
}

StatusOr<std::vector<RowsetPtr>> SizeTieredCompactionPolicy::pick_rowsets() {
ASSIGN_OR_RETURN(auto selected_level, pick_max_level(*_tablet_metadata));
ASSIGN_OR_RETURN(auto selected_level, pick_max_level(*_tablet_metadata, _force_base_compaction));
std::vector<RowsetPtr> input_rowsets;
if (selected_level == nullptr) {
return input_rowsets;
}
int64_t level_multiple = config::size_tiered_level_multiple;
auto min_compaction_segment_num =
std::max<int64_t>(2, std::min(config::min_cumulative_compaction_num_singleton_deltas, level_multiple));
if (_force_base_compaction) { // make sure there is only one rowset
min_compaction_segment_num = 2;
}

// We need a minimum number of segments that trigger compaction to
// avoid triggering compaction too frequently compared to the old version
Expand Down Expand Up @@ -448,7 +450,7 @@ StatusOr<std::vector<RowsetPtr>> SizeTieredCompactionPolicy::pick_rowsets() {
}

double size_tiered_compaction_score(const std::shared_ptr<const TabletMetadataPB>& metadata) {
auto selected_level_or = SizeTieredCompactionPolicy::pick_max_level(*metadata);
auto selected_level_or = SizeTieredCompactionPolicy::pick_max_level(*metadata, false /* force_base_compaction */);
if (!selected_level_or.ok()) {
return 0;
}
Expand Down Expand Up @@ -490,13 +492,16 @@ StatusOr<CompactionAlgorithm> CompactionPolicy::choose_compaction_algorithm(cons
}

StatusOr<CompactionPolicyPtr> CompactionPolicy::create(TabletManager* tablet_mgr,
std::shared_ptr<const TabletMetadataPB> tablet_metadata) {
std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction) {
if (tablet_metadata->schema().keys_type() == PRIMARY_KEYS) {
return std::make_shared<PrimaryCompactionPolicy>(tablet_mgr, std::move(tablet_metadata));
return std::make_shared<PrimaryCompactionPolicy>(tablet_mgr, std::move(tablet_metadata), force_base_compaction);
} else if (config::enable_size_tiered_compaction_strategy) {
return std::make_shared<SizeTieredCompactionPolicy>(tablet_mgr, std::move(tablet_metadata));
return std::make_shared<SizeTieredCompactionPolicy>(tablet_mgr, std::move(tablet_metadata),
force_base_compaction);
} else {
return std::make_shared<BaseAndCumulativeCompactionPolicy>(tablet_mgr, std::move(tablet_metadata));
return std::make_shared<BaseAndCumulativeCompactionPolicy>(tablet_mgr, std::move(tablet_metadata),
force_base_compaction);
}
}

Expand Down
11 changes: 8 additions & 3 deletions be/src/storage/lake/compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ class CompactionPolicy {
virtual StatusOr<CompactionAlgorithm> choose_compaction_algorithm(const std::vector<RowsetPtr>& rowsets);

static StatusOr<CompactionPolicyPtr> create(TabletManager* tablet_mgr,
std::shared_ptr<const TabletMetadataPB> tablet_metadata);
std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction);

protected:
explicit CompactionPolicy(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> tablet_metadata)
: _tablet_mgr(tablet_mgr), _tablet_metadata(std::move(tablet_metadata)) {
explicit CompactionPolicy(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction)
: _tablet_mgr(tablet_mgr),
_tablet_metadata(std::move(tablet_metadata)),
_force_base_compaction(force_base_compaction) {
CHECK(_tablet_mgr != nullptr) << "tablet_mgr is null";
CHECK(_tablet_metadata != nullptr) << "tablet metadata is null";
}

TabletManager* _tablet_mgr;
std::shared_ptr<const TabletMetadataPB> _tablet_metadata;
bool _force_base_compaction;
};

double compaction_score(TabletManager* tablet_mgr, const std::shared_ptr<const TabletMetadataPB>& metadata);
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void CompactionScheduler::compact(::google::protobuf::RpcController* controller,
std::vector<std::unique_ptr<CompactionTaskContext>> contexts_vec;
for (auto tablet_id : request->tablet_ids()) {
auto context = std::make_unique<CompactionTaskContext>(request->txn_id(), tablet_id, request->version(),
is_checker, cb);
request->force_base_compaction(), is_checker, cb);
{
std::lock_guard l(_contexts_lock);
_contexts.Append(context.get());
Expand Down
6 changes: 4 additions & 2 deletions be/src/storage/lake/compaction_task_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ struct CompactionTaskStats {

// Context of a single tablet compaction task.
struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_, bool is_checker_,
std::shared_ptr<CompactionTaskCallback> cb_)
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_, bool force_base_compaction_,
bool is_checker_, std::shared_ptr<CompactionTaskCallback> cb_)
: txn_id(txn_id_),
tablet_id(tablet_id_),
version(version_),
force_base_compaction(force_base_compaction_),
is_checker(is_checker_),
callback(std::move(cb_)) {}

Expand All @@ -76,6 +77,7 @@ struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
const int64_t txn_id;
const int64_t tablet_id;
const int64_t version;
const bool force_base_compaction;
std::atomic<int64_t> start_time{0};
std::atomic<int64_t> finish_time{0};
std::atomic<bool> skipped{false};
Expand Down
5 changes: 3 additions & 2 deletions be/src/storage/lake/primary_key_compaction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ struct PKSizeTieredLevel {

class PrimaryCompactionPolicy : public CompactionPolicy {
public:
explicit PrimaryCompactionPolicy(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> tablet_metadata)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata)) {}
explicit PrimaryCompactionPolicy(TabletManager* tablet_mgr, std::shared_ptr<const TabletMetadataPB> tablet_metadata,
bool force_base_compaction)
: CompactionPolicy(tablet_mgr, std::move(tablet_metadata), force_base_compaction) {}

~PrimaryCompactionPolicy() override = default;

Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ StatusOr<TabletSchemaPtr> TabletManager::get_output_rowset_schema(std::vector<ui
StatusOr<CompactionTaskPtr> TabletManager::compact(CompactionTaskContext* context) {
ASSIGN_OR_RETURN(auto tablet, get_tablet(context->tablet_id, context->version));
auto tablet_metadata = tablet.metadata();
ASSIGN_OR_RETURN(auto compaction_policy, CompactionPolicy::create(this, tablet_metadata));
ASSIGN_OR_RETURN(auto compaction_policy,
CompactionPolicy::create(this, tablet_metadata, context->force_base_compaction));
ASSIGN_OR_RETURN(auto input_rowsets, compaction_policy->pick_rowsets());
ASSIGN_OR_RETURN(auto algorithm, compaction_policy->choose_compaction_algorithm(input_rowsets));
std::vector<uint32_t> input_rowsets_id;
Expand Down
Loading
Loading