Skip to content

Commit f772c72

Browse files
committed
feature: implement early rollback of global transactions when TM disconnects
- TMDisconnectHandler interface for handling TM disconnect events - DefaultTMDisconnectHandler implementation with VGroup-based matching - Configuration option: server.enableRollbackWhenDisconnect (default: false) - Integration with AbstractNettyRemotingServer for TM disconnect detection - Comprehensive test coverage with unit and integration tests
1 parent d1ae495 commit f772c72

File tree

13 files changed

+908
-0
lines changed

13 files changed

+908
-0
lines changed

changes/en-us/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch.
2020

2121
### feature:
2222

23+
- [[#7674](https://github.com/apache/incubator-seata/pull/7674)] implement early rollback of global transactions when TM disconnects (#4422)
2324
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] Add http request filter for seata-server
2425
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] Reuse connection to merge branch transactions
2526
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] upgrade HTTP client in common module to support HTTP/2

changes/zh-cn/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
### feature:
2222

23+
- [[#7674](https://github.com/apache/incubator-seata/pull/7674)] 实现TM断开连接时全局事务的提前回滚功能 (#4422)
2324
- [[#7485](https://github.com/apache/incubator-seata/pull/7485)] 给seata-server端的http请求添加过滤器
2425
- [[#7509](https://github.com/apache/incubator-seata/pull/7509)] 复用连接合并分支事务
2526
- [[#7492](https://github.com/apache/incubator-seata/pull/7492)] 升级 common 模块中的 HTTP 客户端以支持 HTTP/2

common/src/main/java/org/apache/seata/common/ConfigurationKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,11 @@ public interface ConfigurationKeys {
10051005
*/
10061006
String ENABLE_BRANCH_ASYNC_REMOVE = SERVER_PREFIX + SESSION_PREFIX + "enableBranchAsyncRemove";
10071007

1008+
/**
1009+
* The constant ENABLE_ROLLBACK_WHEN_DISCONNECT
1010+
*/
1011+
String ENABLE_ROLLBACK_WHEN_DISCONNECT = SERVER_PREFIX + "enableRollbackWhenDisconnect";
1012+
10081013
/**
10091014
* The constant SERVER_RAFT.
10101015
*/

common/src/main/java/org/apache/seata/common/DefaultValues.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ public interface DefaultValues {
543543
*/
544544
boolean DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE = false;
545545

546+
/**
547+
* the constant DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT
548+
*/
549+
boolean DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT = false;
550+
546551
/**
547552
* The constant DEFAULT_DB_MAX_CONN.
548553
*/
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.rpc;
18+
19+
/**
20+
* Handler for TM disconnection events
21+
* to enable early rollback of global transactions
22+
*/
23+
public interface TMDisconnectHandler {
24+
25+
/**
26+
* Handle TM disconnection and perform early rollback if enabled
27+
*
28+
* @param rpcContext the rpc context of the disconnected TM
29+
*/
30+
void handleTMDisconnect(RpcContext rpcContext);
31+
}

core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.seata.core.protocol.Version;
3535
import org.apache.seata.core.rpc.RemotingServer;
3636
import org.apache.seata.core.rpc.RpcContext;
37+
import org.apache.seata.core.rpc.TMDisconnectHandler;
3738
import org.apache.seata.core.rpc.processor.Pair;
3839
import org.apache.seata.core.rpc.processor.RemotingProcessor;
3940
import org.slf4j.Logger;
@@ -55,6 +56,8 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting
5556

5657
private final NettyServerBootstrap serverBootstrap;
5758

59+
private TMDisconnectHandler tmDisconnectHandler;
60+
5861
@Override
5962
public void init() {
6063
super.init();
@@ -67,6 +70,15 @@ public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServ
6770
serverBootstrap.setChannelHandlers(new ServerHandler());
6871
}
6972

73+
/**
74+
* Set TM disconnect handler
75+
*
76+
* @param tmDisconnectHandler the TM disconnect handler
77+
*/
78+
public void setTmDisconnectHandler(TMDisconnectHandler tmDisconnectHandler) {
79+
this.tmDisconnectHandler = tmDisconnectHandler;
80+
}
81+
7082
@Override
7183
public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp)
7284
throws TimeoutException, IOException {
@@ -211,6 +223,15 @@ private void handleDisconnect(ChannelHandlerContext ctx) {
211223
LOGGER.info(ipAndPort + " to server channel inactive.");
212224
}
213225
if (rpcContext != null && rpcContext.getClientRole() != null) {
226+
// Handle TM disconnection for early rollback if it's a TM role
227+
if (rpcContext.getClientRole() == NettyPoolKey.TransactionRole.TMROLE && tmDisconnectHandler != null) {
228+
try {
229+
tmDisconnectHandler.handleTMDisconnect(rpcContext);
230+
} catch (Exception e) {
231+
LOGGER.error("Error handling TM disconnect for channel: " + ctx.channel(), e);
232+
}
233+
}
234+
214235
rpcContext.release();
215236
if (LOGGER.isInfoEnabled()) {
216237
LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.rpc.netty;
18+
19+
import org.apache.seata.core.rpc.TMDisconnectHandler;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.mockito.MockitoAnnotations;
23+
24+
import java.util.concurrent.LinkedBlockingDeque;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
29+
import static org.mockito.Mockito.mock;
30+
31+
/**
32+
* Test for TM disconnect handling in AbstractNettyRemotingServer
33+
*/
34+
public class AbstractNettyRemotingServerTMDisconnectTest {
35+
36+
private NettyRemotingServer server;
37+
38+
@BeforeEach
39+
void setUp() {
40+
MockitoAnnotations.openMocks(this);
41+
42+
// Use real NettyRemotingServer like other tests
43+
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
44+
server = new NettyRemotingServer(executor);
45+
}
46+
47+
@Test
48+
void testTMDisconnectHandlerSetup() {
49+
// Test that the TM disconnect handler can be set and retrieved
50+
TMDisconnectHandler testHandler = mock(TMDisconnectHandler.class);
51+
52+
// Verify no exceptions are thrown when setting the handler
53+
assertDoesNotThrow(() -> server.setTmDisconnectHandler(testHandler));
54+
assertDoesNotThrow(() -> server.setTmDisconnectHandler(null));
55+
assertDoesNotThrow(() -> server.setTmDisconnectHandler(testHandler));
56+
}
57+
58+
@Test
59+
void testTMDisconnectHandlerIntegration() {
60+
// Test that handler is properly integrated in the AbstractNettyRemotingServer
61+
// This verifies the new method exists and works
62+
assertDoesNotThrow(() -> {
63+
TMDisconnectHandler handler = mock(TMDisconnectHandler.class);
64+
server.setTmDisconnectHandler(handler);
65+
});
66+
}
67+
}

script/config-center/config.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ server.ratelimit.bucketTokenNumPerSecond = 999999
205205
server.ratelimit.bucketTokenMaxNum = 999999
206206
server.ratelimit.bucketTokenInitialNum = 999999
207207

208+
#Early rollback configuration
209+
server.enableRollbackWhenDisconnect=false
210+
208211
server.http.filter.xss.keywords=["<script>", "</script>", "javascript:", "vbscript:"]
209212

210213
#Metrics configuration, only for the server

server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.seata.core.rpc.RemotingServer;
5353
import org.apache.seata.core.rpc.RpcContext;
5454
import org.apache.seata.core.rpc.TransactionMessageHandler;
55+
import org.apache.seata.core.rpc.netty.AbstractNettyRemotingServer;
5556
import org.apache.seata.core.rpc.netty.ChannelManager;
5657
import org.apache.seata.core.rpc.netty.NettyRemotingServer;
5758
import org.apache.seata.server.AbstractTCInboundHandler;
@@ -756,6 +757,13 @@ private void endSchedule(long delay) {
756757
* Init.
757758
*/
758759
public void init() {
760+
// Setup TM disconnect handler for early rollback
761+
if (remotingServer instanceof AbstractNettyRemotingServer) {
762+
DefaultTMDisconnectHandler tmDisconnectHandler = new DefaultTMDisconnectHandler(core);
763+
((AbstractNettyRemotingServer) remotingServer).setTmDisconnectHandler(tmDisconnectHandler);
764+
LOGGER.info("TM disconnect handler initialized for early rollback feature");
765+
}
766+
759767
retryRollbacking.scheduleAtFixedRate(
760768
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking),
761769
0,
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.server.coordinator;
18+
19+
import org.apache.seata.common.ConfigurationKeys;
20+
import org.apache.seata.common.DefaultValues;
21+
import org.apache.seata.config.ConfigurationFactory;
22+
import org.apache.seata.core.exception.TransactionException;
23+
import org.apache.seata.core.model.GlobalStatus;
24+
import org.apache.seata.core.rpc.RpcContext;
25+
import org.apache.seata.core.rpc.TMDisconnectHandler;
26+
import org.apache.seata.server.session.GlobalSession;
27+
import org.apache.seata.server.session.SessionHolder;
28+
import org.apache.seata.server.session.SessionManager;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.Collection;
33+
import java.util.Objects;
34+
35+
/**
36+
* Default implementation of TMDisconnectHandler.
37+
* Handles TM disconnection and performs early rollback of global transactions
38+
* when enabled via configuration.
39+
*/
40+
public class DefaultTMDisconnectHandler implements TMDisconnectHandler {
41+
42+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTMDisconnectHandler.class);
43+
44+
private final Core core;
45+
46+
/**
47+
* Constructor with core dependency
48+
*
49+
* @param core the transaction core
50+
*/
51+
public DefaultTMDisconnectHandler(Core core) {
52+
this.core = core;
53+
}
54+
55+
@Override
56+
public void handleTMDisconnect(RpcContext rpcContext) {
57+
if (rpcContext == null || rpcContext.getTransactionServiceGroup() == null) {
58+
return;
59+
}
60+
61+
// Check if early rollback is enabled
62+
boolean enableRollbackWhenDisconnect = ConfigurationFactory.getInstance()
63+
.getBoolean(
64+
ConfigurationKeys.ENABLE_ROLLBACK_WHEN_DISCONNECT,
65+
DefaultValues.DEFAULT_ENABLE_ROLLBACK_WHEN_DISCONNECT);
66+
67+
if (!enableRollbackWhenDisconnect) {
68+
LOGGER.debug("Early rollback on TM disconnect is disabled");
69+
return;
70+
}
71+
72+
String transactionServiceGroup = rpcContext.getTransactionServiceGroup();
73+
String applicationId = rpcContext.getApplicationId();
74+
75+
LOGGER.info(
76+
"TM disconnected: transactionServiceGroup={}, applicationId={}, performing early rollback check",
77+
transactionServiceGroup,
78+
applicationId);
79+
80+
try {
81+
// Find all global sessions in BEGIN status
82+
Collection<GlobalSession> globalSessions =
83+
SessionHolder.getRootSessionManager().allSessions();
84+
85+
int rollbackCount = 0;
86+
for (GlobalSession globalSession : globalSessions) {
87+
if (shouldRollbackSession(globalSession, rpcContext)) {
88+
try {
89+
LOGGER.info(
90+
"Early rollback for TM disconnect: xid={}, transactionServiceGroup={}, applicationId={}",
91+
globalSession.getXid(),
92+
globalSession.getTransactionServiceGroup(),
93+
globalSession.getApplicationId());
94+
95+
// Change status to TimeoutRollbacking
96+
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacking);
97+
98+
// Perform rollback
99+
core.doGlobalRollback(globalSession, false);
100+
rollbackCount++;
101+
102+
} catch (TransactionException e) {
103+
LOGGER.error(
104+
"Failed to rollback transaction [{}] {} {}",
105+
globalSession.getXid(),
106+
e.getCode(),
107+
e.getMessage());
108+
}
109+
}
110+
}
111+
112+
if (rollbackCount > 0) {
113+
LOGGER.info(
114+
"Early rollback completed for TM disconnect: transactionServiceGroup={}, rollbackCount={}",
115+
transactionServiceGroup,
116+
rollbackCount);
117+
}
118+
119+
} catch (Exception e) {
120+
LOGGER.error(
121+
"Error during TM disconnect handling for transactionServiceGroup={}", transactionServiceGroup, e);
122+
}
123+
}
124+
125+
/**
126+
* Get the session manager instance. Made public for testing purposes.
127+
*
128+
* @return the session manager
129+
*/
130+
public SessionManager getSessionManager() {
131+
return SessionHolder.getRootSessionManager();
132+
}
133+
134+
/**
135+
* Determine if a global session should be rolled back based on the disconnected TM context.
136+
* Uses community-approved matching algorithm:
137+
* 1. Primary: TransactionServiceGroup matching (vgroup approach)
138+
* 2. Secondary: ApplicationId matching for additional safety
139+
* 3. Only rollback sessions in BEGIN status
140+
*
141+
* @param globalSession the global session to check
142+
* @param tmContext the disconnected TM context
143+
* @return true if the session should be rolled back
144+
*/
145+
private boolean shouldRollbackSession(GlobalSession globalSession, RpcContext tmContext) {
146+
// Only rollback sessions in BEGIN status
147+
if (globalSession.getStatus() != GlobalStatus.Begin) {
148+
return false;
149+
}
150+
151+
// Primary identifier: TransactionServiceGroup (vgroup) matching
152+
// This is the community consensus approach from Issue #4422
153+
if (!Objects.equals(globalSession.getTransactionServiceGroup(), tmContext.getTransactionServiceGroup())) {
154+
return false;
155+
}
156+
157+
// Secondary safety check: ApplicationId matching when both are available
158+
// This provides additional confidence to prevent false positives
159+
if (globalSession.getApplicationId() != null && tmContext.getApplicationId() != null) {
160+
return Objects.equals(globalSession.getApplicationId(), tmContext.getApplicationId());
161+
}
162+
163+
// If applicationId is not available on both sides, trust the vgroup match
164+
// This follows the community approach that vgroup is the primary identifier
165+
return true;
166+
}
167+
}

0 commit comments

Comments
 (0)