Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5354,6 +5354,14 @@ Status TabletUpdates::get_column_values(const std::vector<uint32_t>& column_ids,
std::shared_ptr<FileSystem> fs;
for (const auto& [rssid, rowids] : rowids_by_rssid) {
auto iter = rssid_to_rowsets.upper_bound(rssid);
if (iter == rssid_to_rowsets.begin()) {
std::string msg = strings::Substitute(
"rssid $0 is smaller than the minimum rowset seg id $1 in tablet $2, "
"which may be caused by stale primary index entries after compaction",
rssid, rssid_to_rowsets.begin()->first, _tablet.tablet_id());
LOG(ERROR) << msg;
return Status::InternalError(msg);
}
--iter;
const auto& rowset = iter->second.get();
if (!(rowset->rowset_meta()->get_rowset_seg_id() <= rssid &&
Expand Down
89 changes: 89 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,95 @@ TEST_F(TabletUpdatesTest, get_column_values_with_persistent_index) {
test_get_column_values(true);
}

void TabletUpdatesTest::test_get_column_values_with_invalid_rssid(bool enable_persistent_index) {
auto orig = config::vertical_compaction_max_columns_per_group;
config::vertical_compaction_max_columns_per_group = 5;
DeferOp unset_config([&] { config::vertical_compaction_max_columns_per_group = orig; });

srand(GetCurrentTimeMicros());
auto tablet = create_tablet(rand(), rand());
DeferOp del_tablet([&]() {
auto tablet_mgr = StorageEngine::instance()->tablet_manager();
(void)tablet_mgr->drop_tablet(tablet->tablet_id());
(void)fs::remove_all(tablet->schema_hash_path());
});
tablet->set_enable_persistent_index(enable_persistent_index);
const int N = 100;
std::vector<int64_t> keys;
for (int i = 0; i < N; i++) {
keys.emplace_back(i);
}

// Commit multiple rowsets, record the first rowset seg id before compaction.
ASSERT_TRUE(tablet->rowset_commit(2, create_rowset(tablet, keys)).ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(tablet->rowset_commit(3, create_rowset(tablet, keys)).ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(tablet->rowset_commit(4, create_rowset(tablet, keys)).ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));

// Record a valid rssid before compaction (the first rowset seg id).
uint32_t old_rssid = 0;
{
std::vector<RowsetSharedPtr> rowsets;
EditVersion version;
ASSERT_TRUE(tablet->updates()->get_applied_rowsets(4, &rowsets, &version).ok());
ASSERT_FALSE(rowsets.empty());
old_rssid = rowsets[0]->rowset_meta()->get_rowset_seg_id();
}

// Trigger compaction: old rowsets are replaced by a new compacted rowset with a higher seg id.
ASSERT_TRUE(tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(tablet->updates()->num_rowsets(), 1);

// GC old rowsets from _rowsets map. Without this, _rowsets still contains old entries
// even though they are no longer part of any active version.
tablet->updates()->remove_expired_versions(INT64_MAX);

// Verify the compacted rowset has a seg id greater than old_rssid.
uint32_t new_min_rssid = 0;
{
std::vector<RowsetSharedPtr> rowsets;
EditVersion version;
ASSERT_TRUE(tablet->updates()->get_applied_rowsets(4, &rowsets, &version).ok());
ASSERT_EQ(rowsets.size(), 1);
new_min_rssid = rowsets[0]->rowset_meta()->get_rowset_seg_id();
}
ASSERT_GT(new_min_rssid, old_rssid);

// Prepare columns for reading.
std::vector<uint32_t> read_column_ids = {1, 2};
MutableColumns read_columns(read_column_ids.size());
const auto& tablet_schema = tablet->unsafe_tablet_schema_ref();
for (auto i = 0; i < read_column_ids.size(); i++) {
const auto read_column_id = read_column_ids[i];
auto tablet_column = tablet_schema.column(read_column_id);
auto column = ChunkHelper::column_from_field_type(tablet_column.type(), tablet_column.is_nullable());
read_columns[i] = column->clone_empty();
}

// Test: use old_rssid (now stale, smaller than all keys in rssid_to_rowsets) to simulate
// stale primary index entries pointing to compacted-away rowsets.
// This triggers the begin() iterator guard: upper_bound(old_rssid) == begin().
{
std::map<uint32_t, std::vector<uint32_t>> rowids_by_rssid;
rowids_by_rssid[old_rssid] = {0, 1, 2};
auto st = tablet->updates()->get_column_values(read_column_ids, 0, false, rowids_by_rssid, &read_columns,
nullptr, tablet->tablet_schema());
ASSERT_FALSE(st.ok()) << "Should fail for stale rssid after compaction";
ASSERT_TRUE(st.is_internal_error());
}
}

TEST_F(TabletUpdatesTest, get_column_values_with_invalid_rssid) {
test_get_column_values_with_invalid_rssid(false);
}

TEST_F(TabletUpdatesTest, get_column_values_with_invalid_rssid_persistent_index) {
test_get_column_values_with_invalid_rssid(true);
}

void TabletUpdatesTest::test_get_missing_version_ranges(const std::vector<int64_t>& versions,
const std::vector<int64_t>& expected_missing_ranges) {
auto tablet = create_tablet(rand(), rand());
Expand Down
1 change: 1 addition & 0 deletions be/test/storage/tablet_updates_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ class TabletUpdatesTest : public testing::Test {
void test_pk_dump(size_t rowset_cnt);
void update_and_recover(bool enable_persistent_index);
void test_recover_rowset_sorter();
void test_get_column_values_with_invalid_rssid(bool enable_persistent_index);

protected:
TabletSharedPtr _tablet;
Expand Down
Loading