Skip to content

Commit

Permalink
[#23882] YSQL: Improve cache re-invalidation for alter table commands
Browse files Browse the repository at this point in the history
Summary:
Background:

When DDL atomicity is enabled, DDL transaction verification may lead to additional schema version bumps
on an altered table.

This can create a scenario where if an ALTER TABLE operation increments the table’s schema version,
and subsequently performs a scan on it, any following DMLs on the same table within the same session
may encounter a schema version mismatch error.
This happens because after the YB alter invalidates the table cache entry, the table scan reloads it.
When DDL transaction verification bumps the schema version of the table again,
the previously reloaded table cache entry becomes invalid, and would need to be reloaded again.

Commit 53477ae introduced a re-invalidation mechanism
to solve this problem.

This diff makes some changes to the re-invalidation mechanism:

  - Instead of using the YB alter table handles to keep track of the affected tables, simply use the table oids. Although the usage of statement handles doesn't seem to cause any issues on YB master, it causes issues on YB PG15 as upstream PG has changed the flow of some alter table commands. Specifically, the YB PG memory context (`YBCPgMemctx`) where the statement handles are allocated may be freed in an earlier catch block than the one that executes `YbATInvalidateTableCacheAfterAlter`.

  - In `YbATInvalidateTableCacheAfterAlter`, we now check if the relation still exists as we have to retrieve its database oid and relfilenode oid. This is necessary because legacy rewrite operations that alter the relation's oid might have dropped the old relation. If the relation has been dropped, there's no need to invalidate cache entries, as any queries referencing the dropped relation will fail anyway.

  - Commit 53477ae added an optimization to skip schema version increments for alter type without rewrite. However, this approach may be flawed because YB currently drops and recreates dependent indexes when altering a column type, even when no rewrite occurs. This issue is tracked under #24007. For now, revert the changes to skip the schema version increment on the base table, so that we correctly track the relation as altered and execute the re-invalidation mechanism. Also remove the now unused variables 'rewriteState' and 'rewrite' from `YBCPrepareAlterTable` and `YBCPrepareAlterTableCmd`.

    - Add a function in ybc_pggate to invalidate the table cache entry for a given database oid and relfilenode oid.
Jira: DB-12786

Test Plan: ./yb_build.sh --cxx-test pgwrapper_pg_ddl_atomicity-test --gtest_filter PgDdlAtomicityTest.TestTableCacheAfterTxnVerification

Reviewers: myang

Reviewed By: myang

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D38012
  • Loading branch information
fizaaluthra committed Sep 19, 2024
1 parent 903d793 commit 5dc71ea
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 38 deletions.
56 changes: 36 additions & 20 deletions src/postgres/src/backend/commands/tablecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ static void YbATSetPKRewriteChildPartitions(List **yb_wqueue,
bool skip_copy_split_options);
static void YbATCopyIndexSplitOptions(Oid oldId, IndexStmt *stmt,
AlteredTableInfo *tab);
static void YbATInvalidateTableCacheAfterAlter(List *handles);
static void YbATInvalidateTableCacheAfterAlter(List *ybAlteredTableIds);
/* ----------------------------------------------------------------
* DefineRelation
* Creates a new relation.
Expand Down Expand Up @@ -3973,19 +3973,20 @@ ATController(AlterTableStmt *parsetree,

/* Phase 2: update system catalogs */
List *rollbackHandles = NIL;
List *volatile handles = NIL;
List *volatile ybAlteredTableIds = NIL;
PG_TRY();
{
/*
* ATRewriteCatalogs also executes changes to DocDB.
* If Phase 3 fails, rollbackHandle will specify how to rollback the
* changes done to DocDB.
*/
ATRewriteCatalogs(&wqueue, lockmode, &rollbackHandles, &handles);
ATRewriteCatalogs(&wqueue, lockmode, &rollbackHandles,
&ybAlteredTableIds);
}
PG_CATCH();
{
YbATInvalidateTableCacheAfterAlter(handles);
YbATInvalidateTableCacheAfterAlter(ybAlteredTableIds);
PG_RE_THROW();
}
PG_END_TRY();
Expand All @@ -3994,7 +3995,7 @@ ATController(AlterTableStmt *parsetree,
PG_TRY();
{
ATRewriteTables(parsetree, &wqueue, lockmode);
YbATInvalidateTableCacheAfterAlter(handles);
YbATInvalidateTableCacheAfterAlter(ybAlteredTableIds);
}
PG_CATCH();
{
Expand All @@ -4012,7 +4013,7 @@ ATController(AlterTableStmt *parsetree,
YBCExecAlterTable(handle, RelationGetRelid(rel));
}
}
YbATInvalidateTableCacheAfterAlter(handles);
YbATInvalidateTableCacheAfterAlter(ybAlteredTableIds);
PG_RE_THROW();
}
PG_END_TRY();
Expand Down Expand Up @@ -4346,7 +4347,7 @@ static void
ATRewriteCatalogs(List **wqueue,
LOCKMODE lockmode,
List **rollbackHandles,
List *volatile *handles)
List *volatile *ybAlteredTableIds)
{
int pass;
ListCell *ltab;
Expand All @@ -4363,12 +4364,13 @@ ATRewriteCatalogs(List **wqueue,
AlteredTableInfo* info = (AlteredTableInfo *) linitial(*wqueue);
Oid main_relid = info->relid;
YBCPgStatement rollbackHandle = NULL;
*handles = YBCPrepareAlterTable(info->subcmds,
List *handles = YBCPrepareAlterTable(info->subcmds,
AT_NUM_PASSES,
main_relid,
&rollbackHandle,
false /* isPartitionOfAlteredTable */,
info->rewrite);
false /* isPartitionOfAlteredTable */);
if (handles)
*ybAlteredTableIds = lappend_oid(*ybAlteredTableIds, main_relid);
if (rollbackHandle)
*rollbackHandles = lappend(*rollbackHandles, rollbackHandle);

Expand All @@ -4392,13 +4394,14 @@ ATRewriteCatalogs(List **wqueue,
AT_NUM_PASSES,
childrelid,
&childRollbackHandle,
true /*isPartitionOfAlteredTable */,
info->rewrite);
true /*isPartitionOfAlteredTable */);
if (child_handles)
*ybAlteredTableIds = lappend_oid(*ybAlteredTableIds, childrelid);
ListCell *listcell = NULL;
foreach(listcell, child_handles)
{
YBCPgStatement child = (YBCPgStatement) lfirst(listcell);
*handles = lappend(*handles, child);
handles = lappend(handles, child);
}
if (childRollbackHandle)
*rollbackHandles = lappend(*rollbackHandles, childRollbackHandle);
Expand Down Expand Up @@ -4427,7 +4430,7 @@ ATRewriteCatalogs(List **wqueue,
*/
if (pass == AT_PASS_ADD_INDEX)
{
foreach(lc, *handles)
foreach(lc, handles)
{
YBCPgStatement handle = (YBCPgStatement) lfirst(lc);
YBCExecAlterTable(handle, main_relid);
Expand Down Expand Up @@ -4498,7 +4501,7 @@ ATRewriteCatalogs(List **wqueue,
*/
if (yb_table_cloned)
{
foreach (lc, *handles)
foreach (lc, handles)
{
YBCPgStatement handle = (YBCPgStatement) lfirst(lc);
YBCPgAlterTableSetTableId(
Expand Down Expand Up @@ -19303,20 +19306,33 @@ static void YbATCopyIndexSplitOptions(Oid oldId, IndexStmt *stmt,
* Used in YB to re-invalidate table cache entries at the end of an ALTER TABLE
* operation.
*/
static void YbATInvalidateTableCacheAfterAlter(List *handles)
static void YbATInvalidateTableCacheAfterAlter(List *ybAlteredTableIds)
{
if (YbDdlRollbackEnabled() && handles)
if (YbDdlRollbackEnabled() && ybAlteredTableIds)
{
/*
* As part of DDL transaction verification, we may have incremented
* the schema version for the affected tables. So, re-invalidate
* the table cache entries of the affected tables.
*/
ListCell *lc = NULL;
foreach(lc, handles)
foreach(lc, ybAlteredTableIds)
{
YBCPgStatement handle = (YBCPgStatement) lfirst(lc);
HandleYBStatus(YBCPgAlterTableInvalidateTableCacheEntry(handle));
Oid relid = lfirst_oid(lc);
Relation rel = RelationIdGetRelation(relid);
/*
* The relation may no longer exist if it was dropped as part of
* a legacy rewrite operation. We can skip invalidation in that
* case.
*/
if (!rel)
{
Assert(!yb_enable_alter_table_rewrite);
continue;
}
YBCPgAlterTableInvalidateTableByOid(YBCGetDatabaseOidByRelid(relid),
YbGetRelfileNodeIdFromRelId(relid));
RelationClose(rel);
}
}
}
15 changes: 3 additions & 12 deletions src/postgres/src/backend/commands/ybccmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -1214,8 +1214,7 @@ static List*
YBCPrepareAlterTableCmd(AlterTableCmd* cmd, Relation rel, List *handles,
int* col, bool* needsYBAlter,
YBCPgStatement* rollbackHandle,
bool isPartitionOfAlteredTable,
int rewrite)
bool isPartitionOfAlteredTable)
{
Oid relationId = RelationGetRelid(rel);
Oid relfileNodeId = YbGetRelfileNodeId(rel);
Expand Down Expand Up @@ -1421,13 +1420,6 @@ YBCPrepareAlterTableCmd(AlterTableCmd* cmd, Relation rel, List *handles,
cmd->name, RelationGetRelationName(rel))));
}
ReleaseSysCache(typeTuple);

/*
* If this ALTER TYPE operation doesn't require a rewrite
* we do not need to increment the schema version.
*/
if (!(rewrite & AT_REWRITE_COLUMN_REWRITE))
break;
}
/*
* For these cases a YugaByte metadata does not need to be updated
Expand Down Expand Up @@ -1638,8 +1630,7 @@ YBCPrepareAlterTable(List** subcmds,
int subcmds_size,
Oid relationId,
YBCPgStatement *rollbackHandle,
bool isPartitionOfAlteredTable,
int rewriteState)
bool isPartitionOfAlteredTable)
{
/* Appropriate lock was already taken */
Relation rel = relation_open(relationId, NoLock);
Expand Down Expand Up @@ -1667,7 +1658,7 @@ YBCPrepareAlterTable(List** subcmds,
handles = YBCPrepareAlterTableCmd(
(AlterTableCmd *) lfirst(lcmd), rel, handles,
&col, &needsYBAlter, rollbackHandle,
isPartitionOfAlteredTable, rewriteState);
isPartitionOfAlteredTable);
}
}
relation_close(rel, NoLock);
Expand Down
3 changes: 1 addition & 2 deletions src/postgres/src/include/commands/ybccmds.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ extern List* YBCPrepareAlterTable(List** subcmds,
int subcmds_size,
Oid relationId,
YBCPgStatement *rollbackHandle,
bool isPartitionOfAlteredTable,
int rewriteState);
bool isPartitionOfAlteredTable);

extern void YBCExecAlterTable(YBCPgStatement handle, Oid relationId);

Expand Down
5 changes: 5 additions & 0 deletions src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,11 @@ YBCStatus YBCPgInvalidateTableCacheByTableId(const char *table_id) {
return YBCStatusOK();
}

void YBCPgAlterTableInvalidateTableByOid(
const YBCPgOid database_oid, const YBCPgOid table_relfilenode_oid) {
pgapi->InvalidateTableCache(PgObjectId(database_oid, table_relfilenode_oid));
}

// Tablegroup Operations ---------------------------------------------------------------------------

YBCStatus YBCPgNewCreateTablegroup(const char *database_name,
Expand Down
3 changes: 3 additions & 0 deletions src/yb/yql/pggate/ybc_pggate.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ YBCStatus YBCPgExecAlterTable(YBCPgStatement handle);

YBCStatus YBCPgAlterTableInvalidateTableCacheEntry(YBCPgStatement handle);

void YBCPgAlterTableInvalidateTableByOid(
const YBCPgOid database_oid, const YBCPgOid table_relfilenode_oid);

YBCStatus YBCPgNewDropTable(YBCPgOid database_oid,
YBCPgOid table_relfilenode_oid,
bool if_exist,
Expand Down
26 changes: 22 additions & 4 deletions src/yb/yql/pgwrapper/pg_ddl_atomicity-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1669,29 +1669,47 @@ TEST_F(PgDdlAtomicityMiniClusterTest, TestWaitForRollbackWithMasterRestart) {

// Test that the table cache is correctly invalidated after transaction verification
// completes for an ALTER TABLE operation that performs a table scan.
TEST_F(PgDdlAtomicityMiniClusterTest, TestTableCacheAfterTxnVerification) {
TEST_F(PgDdlAtomicityTest, TestTableCacheAfterTxnVerification) {
// Set report_ysql_ddl_txn_status_to_master to false, so that we can test the schema verification
// codepaths on master.
ASSERT_OK(cluster_->SetFlagOnTServers(
"report_ysql_ddl_txn_status_to_master", "false"));
auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(conn.ExecuteFormat(
"CREATE TABLE test (key INT PRIMARY KEY, value TEXT, num real, serialcol SERIAL) "
"PARTITION BY LIST(key)"));
ASSERT_OK(conn.ExecuteFormat(
"CREATE TABLE test1 PARTITION OF test FOR VALUES IN (1)"));
ASSERT_OK(conn.ExecuteFormat(
"CREATE TABLE test2 PARTITION OF test FOR VALUES IN (2, 3, 4)"));
"CREATE TABLE test2 PARTITION OF test FOR VALUES IN (2, 3, 4, 5)"));
ASSERT_OK(conn.ExecuteFormat("INSERT INTO test VALUES (1, 'value', 1.0), (2, 'value', 2.0)"));
ASSERT_OK(conn.TestFailDdl(
"ALTER TABLE test DROP COLUMN value, ADD CONSTRAINT check_num CHECK (num > 0)"));
// Ensure there is no schema version mismatch after a failed ALTER operation that performs
// a table scan.
ASSERT_OK(conn.Execute("INSERT INTO test2 VALUES (3, 'value', 3.0)"));
ASSERT_OK(conn.Execute("BEGIN; INSERT INTO test2 VALUES (3, 'value', 3.0); COMMIT;"));
ASSERT_OK(conn.ExecuteFormat(
"ALTER TABLE test DROP COLUMN value, ADD CONSTRAINT check_num CHECK (num > 0)"));
// Ensure there is no schema version mismatch after a successful ALTER operation that performs
// a table scan.
ASSERT_OK(conn.Execute("INSERT INTO test2 VALUES (4, 4.0)"));
ASSERT_OK(conn.Execute("BEGIN; INSERT INTO test2 VALUES (4, 4.0); COMMIT;"));
auto rows =
ASSERT_RESULT((conn.FetchRows<int32_t, float, int32_t>("SELECT * FROM test2 ORDER BY key")));
ASSERT_EQ(rows, (decltype(rows){{2, 2, 2}, {3, 3, 3}, {4, 4, 4}}));
// Ensure there is no schema version mismatch for various ALTERs.
// Alter type (with no rewrite).
ASSERT_OK(conn.Execute("ALTER TABLE test ALTER COLUMN num TYPE double precision"));
ASSERT_OK(conn.Execute("BEGIN; INSERT INTO test VALUES (5, 5.0); COMMIT;"));
// Legacy table rewrite.
ASSERT_OK(conn.Execute("CREATE TABLE test3 (key INT, value TEXT)"));
ASSERT_OK(conn.Execute("INSERT INTO test3 VALUES (1, 'value')"));
ASSERT_OK(conn.Execute("SET yb_enable_alter_table_rewrite = OFF;"));
ASSERT_OK(conn.Execute(
"ALTER TABLE test3 ADD PRIMARY KEY (key), ALTER COLUMN value SET NOT NULL"));
ASSERT_OK(conn.Execute("BEGIN; INSERT INTO test3 VALUES (2, 'value2'); COMMIT;"));
ASSERT_OK(conn.Execute("ALTER TABLE test3 ALTER COLUMN value TYPE int USING length(value),"
"ADD CONSTRAINT check_value CHECK (value > 0)"));
ASSERT_OK(conn.Execute("BEGIN; INSERT INTO test3 VALUES (3, 6); COMMIT;"));
}

// Test that DDL-related metadata in TableInfo objects is cleared on drop.
Expand Down

0 comments on commit 5dc71ea

Please sign in to comment.