Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: modify XA mode pre commit transaction from commit phase to before close phase #7102

Open
wants to merge 17 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ private void xaEnd(XAXid xaXid, int flags) throws XAException {
if (!xaEnded) {
xaResource.end(xaXid, flags);
xaEnded = true;
} else {
if (flags == XAResource.TMSUCCESS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么只有TMSUCCESS才需要end?join进来的不再end了吗?
假设事务1 commit,事务2继续用这个connection commit(等于先join)走到这个xaend,就不执行xaend了?还有rollback的时候会走到这个xaend方法,此时rollback不直接end,直接进行rollback吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xaEnded为false的时候会xaResource.end,这里flags == XAResource.TMSUCCESS是为了事务2继续用这个connection commit时能够xaResource.end,因为第一次commit把xaEnded置为true了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxiangyeyu0 我看到只在close的时候至为false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xaEnded默认是false,第一次调用xaEnd方法会把xaEnded置为true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection没有close时直接复用,xaended就是true了,如何再次end? @xiaoxiangyeyu0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

传入的flags值为XAResource.TMSUCCESS,走else的分支会执行end

xaResource.end(xaXid, flags);
}
}
}

Expand Down Expand Up @@ -172,30 +176,40 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
// According to JDBC spec:
// If this method is called during a transaction and the
// auto-commit mode is changed, the transaction is committed.
if (xaActive) {
if (xaActive && !xaEnded) {
commit();
}
} else {
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
int flags;
if (this.xaBranchXid != null && currentAutoCommitStatus) {
flags = XAResource.TMJOIN;
} else {
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
flags = SeataXAResource.ORATRANSLOOSE;
} else {
flags = XAResource.TMNOFLAGS;
}
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
// Keep the Connection if necessary
keepIfNecessary();
try {
start();
start(flags);
} catch (XAException e) {
cleanXABranchContext();
throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
Expand Down Expand Up @@ -223,34 +237,10 @@ public synchronized void commit() throws SQLException {
throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
}
try {
// XA End: Success
try {
end(XAResource.TMSUCCESS);
} catch (SQLException sqle) {
// Rollback immediately before the XA Branch Context is deleted.
String xaBranchXid = this.xaBranchXid.toString();
rollback();
throw new SQLException("Branch " + xaBranchXid + " was rollbacked on committing since " + sqle.getMessage(), SQLSTATE_XA_NOT_END, sqle);
}
long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
int prepare = xaResource.prepare(xaBranchXid);
// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only feedback.
// Therefore, the database type check can be eliminated here.
if (prepare == XAResource.XA_RDONLY) {
// Branch Report to TC: RDONLY
reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
}
} catch (XAException xe) {
// Branch Report to TC: Failed
reportStatusToTC(BranchStatus.PhaseOne_Failed);
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
end(XAResource.TMSUCCESS);
} catch (XAException e) {
throw new SQLException("Failed to end(TMSUCCESS) xa branch on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + e.getMessage(), e);
}
}

Expand Down Expand Up @@ -280,13 +270,9 @@ public void rollback() throws SQLException {
}
}

private synchronized void start() throws XAException, SQLException {
private synchronized void start(int flags) throws XAException, SQLException {
// 3. XA Start
if (JdbcConstants.ORACLE.equals(resource.getDbType())) {
xaResource.start(this.xaBranchXid, SeataXAResource.ORATRANSLOOSE);
} else {
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
}
xaResource.start(this.xaBranchXid, flags);

try {
termination();
Expand Down Expand Up @@ -324,13 +310,36 @@ private void checkTimeout(Long now) throws XAException {

@Override
public synchronized void close() throws SQLException {
rollBacked = false;
if (isHeld() && shouldBeHeld()) {
// if kept by a keeper, just hold the connection.
return;
try {
if (xaEnded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end后的xa事务应该是可以join的,只要没有进入prepare阶段,如果我手动commit或rollback,再接着发sql,这里被标识成了xaended,如何进行prepare?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

接着发sql最终还是会调用commit方法,xaEnded会被置为true,close时依然会进行prepare

long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
int prepare = xaResource.prepare(xaBranchXid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果走了rollback的事务不需要prepare,直接上报一阶段失败即可
If the transaction that has gone rollback does not need to be prepared, it can directly report the failure of the first stage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback方法的finally会调用cleanXABranchContext,这里会把xaEnded置为false

// Based on the four databases: MySQL (8), Oracle (12c), Postgres (16), and MSSQL Server (2022),
// only Oracle has read-only optimization; the others do not provide read-only feedback.
// Therefore, the database type check can be eliminated here.
if (prepare == XAResource.XA_RDONLY) {
// Branch Report to TC: RDONLY
reportStatusToTC(BranchStatus.PhaseOne_RDONLY);
}
xaEnded = false;
}
} catch (XAException xe) {
// Branch Report to TC: Failed
reportStatusToTC(BranchStatus.PhaseOne_Failed);
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
rollBacked = false;
if (isHeld() && shouldBeHeld()) {
// if kept by a keeper, just hold the connection.
} else {
cleanXABranchContext();
originalConnection.close();
}
}
cleanXABranchContext();
originalConnection.close();
}

protected synchronized void closeForce() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testXABranchCommit() throws Throwable {
connectionProxyXA.commit();

Mockito.verify(xaResource).end(any(Xid.class), any(Integer.class));
Mockito.verify(xaResource).prepare(any(Xid.class));
Mockito.verify(xaResource, times(0)).prepare(any(Xid.class));
}

@Test
Expand Down
Loading