Skip to content

Commit 5ae9378

Browse files
[fix](schema-change) Fix job replay failure when partitions added to table after job finish and before log edition
1 parent b959c66 commit 5ae9378

File tree

2 files changed

+47
-12
lines changed

2 files changed

+47
-12
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -663,21 +663,21 @@ protected void runRunningJob() throws AlterCancelException {
663663
commitShadowIndex();
664664
// all partitions are good
665665
onFinished(tbl);
666-
} finally {
667-
tbl.writeUnlock();
668-
}
669-
670-
pruneMeta();
671-
672-
LOG.info("schema change job finished: {}", jobId);
666+
pruneMeta();
673667

674-
changeTableState(dbId, tableId, OlapTableState.NORMAL);
675-
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
668+
LOG.info("schema change job finished: {}", jobId);
676669

677-
this.jobState = JobState.FINISHED;
678-
this.finishedTimeMs = System.currentTimeMillis();
679-
Env.getCurrentEnv().getEditLog().logAlterJob(this);
670+
changeTableState(dbId, tableId, OlapTableState.NORMAL);
671+
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
680672

673+
this.jobState = JobState.FINISHED;
674+
this.finishedTimeMs = System.currentTimeMillis();
675+
// Write edit log with table's write lock held, to avoid adding partitions before writing edit log,
676+
// else it will try to transform index in newly added partition while replaying and result in failure.
677+
Env.getCurrentEnv().getEditLog().logAlterJob(this);
678+
} finally {
679+
tbl.writeUnlock();
680+
}
681681
postProcessOriginIndex();
682682
// Drop table column stats after schema change finished.
683683
Env.getCurrentEnv().getAnalysisManager().dropStats(tbl, null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import org.apache.doris.regression.suite.ClusterOptions
2+
import org.apache.doris.regression.util.NodeType
3+
4+
suite("test_sc_replay_after_add_partition", "docker") {
5+
def options = new ClusterOptions()
6+
options.enableDebugPoints()
7+
options.setFeNum(1)
8+
options.connectToFollower = true
9+
10+
docker(options) {
11+
def tbl = "test_forward_query"
12+
sql """ DROP TABLE IF EXISTS ${tbl} """
13+
sql """
14+
CREATE TABLE ${tbl}
15+
(
16+
k1 int,
17+
v1 int
18+
)
19+
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
20+
PROPERTIES (
21+
"replication_num" = "1"
22+
);
23+
"""
24+
sql """
25+
INSERT INTO ${tbl} VALUES(1, 1);
26+
"""
27+
sql """
28+
INSERT INTO ${tbl} VALUES(2, 2);
29+
"""
30+
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries': [forwardAllQueries:true, execute:1]])
31+
sql """
32+
33+
"""
34+
}
35+
}

0 commit comments

Comments
 (0)