Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] (inverted index) Refactor Inverted index file writer (#41625) #43528

Open
wants to merge 1 commit into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
if (rowset_schema->has_inverted_index()) {
if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
}
}

RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
DEFINE_mBool(inverted_index_compaction_enable, "true");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
Expand Down
158 changes: 39 additions & 119 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ Status Compaction::merge_input_rowsets() {
Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
// 1. Merge segment files and write bkd inverted index
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
Expand All @@ -194,17 +195,19 @@ Status Compaction::merge_input_rowsets() {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
return res;
_tablet->last_compaction_status = res;
if (!res.ok()) {
return res;
}
// 2. Merge the remaining inverted index files of the string type
RETURN_IF_ERROR(do_inverted_index_compaction());
}

COUNTER_UPDATE(_merged_rows_counter, _stats.merged_rows);
COUNTER_UPDATE(_filtered_rows_counter, _stats.filtered_rows);

// 3. In the `build`, `_close_file_writers` is called to close the inverted index file writer and write the final compound index file.
RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset),
fmt::format("rowset writer build failed. output_version: {}",
_output_version.to_string()));
Expand Down Expand Up @@ -450,8 +453,6 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(modify_rowsets());

auto* cumu_policy = tablet()->cumulative_compaction_policy();
Expand Down Expand Up @@ -607,58 +608,9 @@ Status Compaction::do_inverted_index_compaction() {

// dest index files
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = ctx.fs()->file_size(
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
return st;
}
}
for (const auto& writer : inverted_index_file_writers) {
writer->set_file_writer_opts(ctx.get_file_writer_options());
}
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num);

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
Expand All @@ -684,29 +636,6 @@ Status Compaction::do_inverted_index_compaction() {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index = rowset->tablet_schema()->get_inverted_index(col);
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"if index properties are different, index compaction needs to be "
"skipped.");
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}

std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
try {
std::vector<std::unique_ptr<DorisCompoundReader>> src_idx_dirs(src_segment_num);
Expand All @@ -731,40 +660,12 @@ Status Compaction::do_inverted_index_compaction() {
}
}

std::vector<InvertedIndexFileInfo> all_inverted_index_file_info(dest_segment_num);
uint64_t inverted_index_file_size = 0;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_total_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
all_inverted_index_file_info[seg_id] = inverted_index_file_writer->get_index_file_info();
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
if (!status.ok()) {
return status;
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);

_output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info);
COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". tablet=" << _tablet->tablet_id()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";

return Status::OK();
Expand All @@ -789,6 +690,31 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
if (!field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
continue;
}

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index =
rowset->tablet_schema()->get_inverted_index(col_unique_id, "");
// no inverted index or index id is different from current index id
if (tablet_index == nullptr || tablet_index->index_id() != index.index_id()) {
is_continue = true;
break;
}
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}
auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
if (rowset->is_skip_index_compaction(col_unique_id)) {
Expand Down Expand Up @@ -881,9 +807,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_index_compaction_columns(ctx);
}
ctx.version = _output_version;
Expand Down Expand Up @@ -1149,8 +1073,6 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand All @@ -1177,9 +1099,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_index_compaction_columns(ctx);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Compaction {
protected:
Status merge_input_rowsets();

// merge inverted index files
Status do_inverted_index_compaction();

void construct_index_compaction_columns(RowsetWriterContext& ctx);
Expand Down
Loading
Loading