Skip to content

Commit

Permalink
[#23700] CDCSDK: Use leader epoch instead of leader term in table rem…
Browse files Browse the repository at this point in the history
…oval bg task

Summary:
Catalog manager background thread processes tables that needs to be removed from CDC streams. As part of the removal, we modify the stream metadata and persist in sys-catalog table. While persisting, we were using the leader ready term instead of the leader epoch fetched at the start of the background thread which has been fixed now.
Jira: DB-12608

Test Plan: Existing cdc ctests

Reviewers: xCluster, hsunder, skumar, sumukh.phalgaonkar

Reviewed By: skumar

Subscribers: ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D38155
  • Loading branch information
siddharth2411 committed Sep 19, 2024
1 parent 1e70024 commit 7a4b409
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2937,7 +2937,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const std::unordered_set<TableId>& tables_in_stream_metadata);

Status RemoveTableFromCDCStreamMetadataAndMaps(
const CDCStreamInfoPtr stream, const TableId table_id);
const CDCStreamInfoPtr stream, const TableId table_id, const LeaderEpoch& epoch);

// Should be bumped up when tablet locations are changed.
std::atomic<uintptr_t> tablet_locations_version_{0};
Expand Down
7 changes: 3 additions & 4 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2380,7 +2380,7 @@ Status CatalogManager::ProcessTablesToBeRemovedFromCDCSDKStreams(
"ProcessTablesToBeRemovedFromCDCSDKStreams::StartRemovalFromQualifiedTableList");

if (!FLAGS_TEST_cdcsdk_skip_table_removal_from_qualified_list) {
Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id);
Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id, epoch);
if (!status.ok()) {
LOG(WARNING) << "Encountered error while trying to remove table " << table_id
<< " from qualified table list of stream " << stream_id << " and maps. - "
Expand Down Expand Up @@ -5085,7 +5085,7 @@ Result<std::vector<cdc::CDCStateTableEntry>> CatalogManager::SyncCDCStateTableEn
}

Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps(
const CDCStreamInfoPtr stream, const TableId table_id) {
const CDCStreamInfoPtr stream, const TableId table_id, const LeaderEpoch& epoch) {
// Remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist
// the updated metadata.
{
Expand All @@ -5102,8 +5102,7 @@ Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps(
LOG_WITH_FUNC(INFO) << "Removing table " << table_id
<< " from qualified table list of CDC stream " << stream->id();
RETURN_ACTION_NOT_OK(
sys_catalog_->Upsert(leader_ready_term(), stream),
"Updating CDC streams in system catalog");
sys_catalog_->Upsert(epoch, stream), "Updating CDC streams in system catalog");
}

ltm.Commit();
Expand Down

0 comments on commit 7a4b409

Please sign in to comment.