Skip to content

Commit

Permalink
enhance: refactor delete mvcc function
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Nov 29, 2024
1 parent 5e807c3 commit 9632660
Show file tree
Hide file tree
Showing 17 changed files with 688 additions and 402 deletions.
9 changes: 9 additions & 0 deletions internal/core/src/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ Join(const std::vector<T>& items, const std::string& delimiter) {
return ss.str();
}

inline std::string
PrintBitsetTypeView(const BitsetTypeView& view) {
std::stringstream ss;
for (auto i = 0; i < view.size(); ++i) {
ss << int(view[i]);
}
return ss.str();
}

inline std::string
GetCommonPrefix(const std::string& str1, const std::string& str2) {
size_t len = std::min(str1.length(), str2.length());
Expand Down
69 changes: 6 additions & 63 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,38 +669,8 @@ ChunkedSegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);

std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}

if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}

// all record filtered
if (size == 0) {
return;
}

std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}

void
Expand Down Expand Up @@ -858,35 +828,7 @@ void
ChunkedSegmentSealedImpl::mask_with_delete(BitsetTypeView& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}

auto bitmap_holder = std::shared_ptr<DeletedRecord::TmpBitmap>();

auto search_fn = [this](const PkType& pk, int64_t barrier) {
return this->search_pk(pk, barrier);
};
bitmap_holder = get_deleted_bitmap(del_barrier,
ins_barrier,
deleted_record_,
insert_record_,
timestamp,
is_sorted_by_pk_,
search_fn);

if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -1337,7 +1279,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
is_sorted_by_pk_(is_sorted_by_pk) {
is_sorted_by_pk_(is_sorted_by_pk),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
Expand Down Expand Up @@ -1974,7 +1917,7 @@ ChunkedSegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand Down
13 changes: 7 additions & 6 deletions internal/core/src/segcore/ChunkedSegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return stats_.mem_size.load() + deleted_record_.mem_size();
}

InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}

int64_t
get_row_count() const override;

Expand Down Expand Up @@ -293,6 +298,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}

void
Expand All @@ -317,11 +323,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;

Expand Down Expand Up @@ -362,7 +363,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;

LoadFieldDataInfo field_data_info_;

Expand Down
Loading

0 comments on commit 9632660

Please sign in to comment.