Skip to content

Commit ca44cfe

Browse files
authored
refactor: Refactor DataSourceProxy (#7615)
1 parent 8763897 commit ca44cfe

13 files changed

+501
-176
lines changed

changes/en-us/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ Add changes here for all PR submitted to the 2.x branch.
8484

8585
### refactor:
8686

87+
- [[#7615](https://github.com/seata/seata/pull/7615)] Refactor DataSourceProxy
8788
- [[#7617](https://github.com/seata/seata/pull/7617)] Refactor Alibaba Dubbo and HSF
8889

8990

changes/zh-cn/2.x.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383

8484
### refactor:
8585

86+
- [[#7615](https://github.com/seata/seata/pull/7615)] 重构 DataSourceProxy
8687
- [[#7617](https://github.com/seata/seata/pull/7617)] 重构 Alibaba Dubbo 和 HSF 模块
8788

8889

rm-datasource/src/main/java/org/apache/seata/rm/datasource/DataSourceProxy.java

Lines changed: 16 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
import org.apache.commons.lang.StringUtils;
2020
import org.apache.seata.common.ConfigurationKeys;
21-
import org.apache.seata.common.Constants;
2221
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
2322
import org.apache.seata.config.ConfigurationFactory;
2423
import org.apache.seata.core.context.RootContext;
2524
import org.apache.seata.core.model.BranchType;
2625
import org.apache.seata.core.model.Resource;
2726
import org.apache.seata.rm.DefaultResourceManager;
27+
import org.apache.seata.rm.datasource.initializer.ResourceIdInitializer;
28+
import org.apache.seata.rm.datasource.initializer.ResourceIdInitializerRegistry;
2829
import org.apache.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
2930
import org.apache.seata.rm.datasource.undo.UndoLogManager;
3031
import org.apache.seata.rm.datasource.undo.UndoLogManagerFactory;
@@ -45,7 +46,6 @@
4546

4647
/**
4748
* The type Data source proxy.
48-
*
4949
*/
5050
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
5151

@@ -131,7 +131,6 @@ private void init(DataSource dataSource, String resourceGroupId) {
131131

132132
/**
133133
* Define derivative product version for MySQL Kernel
134-
*
135134
*/
136135
private void checkDerivativeProduct() {
137136
if (!JdbcConstants.MYSQL.equals(dbType)) {
@@ -234,179 +233,8 @@ public String getResourceId() {
234233
}
235234

236235
private void initResourceId() {
237-
if (JdbcConstants.POSTGRESQL.equals(dbType)) {
238-
initPGResourceId();
239-
} else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {
240-
initOracleResourceId();
241-
} else if (JdbcConstants.MYSQL.equals(dbType) || JdbcConstants.POLARDBX.equals(dbType)) {
242-
initMysqlResourceId();
243-
} else if (JdbcConstants.SQLSERVER.equals(dbType)) {
244-
initSqlServerResourceId();
245-
} else if (JdbcConstants.DM.equals(dbType)) {
246-
initDMResourceId();
247-
} else if (JdbcConstants.OSCAR.equals(dbType)) {
248-
initOscarResourceId();
249-
} else {
250-
initDefaultResourceId();
251-
}
252-
}
253-
254-
/**
255-
* init the default resource id
256-
*/
257-
private void initDefaultResourceId() {
258-
if (jdbcUrl.contains("?")) {
259-
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
260-
} else {
261-
resourceId = jdbcUrl;
262-
}
263-
}
264-
265-
/**
266-
* init the oracle resource id
267-
*/
268-
private void initOracleResourceId() {
269-
if (jdbcUrl.contains("?")) {
270-
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?')) + "/" + userName;
271-
} else {
272-
resourceId = jdbcUrl + "/" + userName;
273-
}
274-
}
275-
276-
/**
277-
* prevent mysql url like
278-
* jdbc:mysql:loadbalance://192.168.100.2:3306,192.168.100.1:3306/seata
279-
* it will cause the problem like
280-
* 1.rm client is not connected
281-
*/
282-
private void initMysqlResourceId() {
283-
String startsWith = "jdbc:mysql:loadbalance://";
284-
if (jdbcUrl.startsWith(startsWith)) {
285-
String url;
286-
if (jdbcUrl.contains("?")) {
287-
url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
288-
} else {
289-
url = jdbcUrl;
290-
}
291-
resourceId = url.replace(",", "|");
292-
} else {
293-
initDefaultResourceId();
294-
}
295-
}
296-
297-
private void initDMResourceId() {
298-
LOGGER.warn("support for the dameng database is currently an experimental feature ");
299-
if (jdbcUrl.contains("?")) {
300-
StringBuilder jdbcUrlBuilder = new StringBuilder();
301-
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf('?'));
302-
303-
StringBuilder paramsBuilder = new StringBuilder();
304-
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf('?') + 1);
305-
String[] urlParams = paramUrl.split("&");
306-
for (String urlParam : urlParams) {
307-
if (urlParam.contains("schema")) {
308-
// remove the '"'
309-
if (urlParam.contains("\"")) {
310-
urlParam = urlParam.replaceAll("\"", "");
311-
}
312-
paramsBuilder.append(urlParam);
313-
break;
314-
}
315-
}
316-
317-
if (paramsBuilder.length() > 0) {
318-
jdbcUrlBuilder.append("?");
319-
jdbcUrlBuilder.append(paramsBuilder);
320-
}
321-
resourceId = jdbcUrlBuilder.toString();
322-
} else {
323-
resourceId = jdbcUrl;
324-
}
325-
}
326-
327-
/**
328-
* init the oscar resource id
329-
* jdbc:oscar://192.168.x.xx:2003/OSRDB
330-
*/
331-
private void initOscarResourceId() {
332-
if (jdbcUrl.contains("?")) {
333-
resourceId = jdbcUrl.substring(0, jdbcUrl.indexOf('?')) + "/" + userName;
334-
} else {
335-
resourceId = jdbcUrl + "/" + userName;
336-
}
337-
}
338-
339-
/**
340-
* prevent pg sql url like
341-
* jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=public
342-
* jdbc:postgresql://127.0.0.1:5432/seata?currentSchema=seata
343-
* cause the duplicated resourceId
344-
* it will cause the problem like
345-
* 1.get file lock fail
346-
* 2.error table meta cache
347-
*/
348-
private void initPGResourceId() {
349-
if (jdbcUrl.contains("?")) {
350-
StringBuilder jdbcUrlBuilder = new StringBuilder();
351-
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf('?'));
352-
353-
StringBuilder paramsBuilder = new StringBuilder();
354-
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf('?') + 1);
355-
String[] urlParams = paramUrl.split("&");
356-
for (String urlParam : urlParams) {
357-
if (urlParam.contains("currentSchema")) {
358-
if (urlParam.contains(Constants.DBKEYS_SPLIT_CHAR)) {
359-
urlParam = urlParam.replace(Constants.DBKEYS_SPLIT_CHAR, "!");
360-
}
361-
paramsBuilder.append(urlParam);
362-
break;
363-
}
364-
}
365-
366-
if (paramsBuilder.length() > 0) {
367-
jdbcUrlBuilder.append("?");
368-
jdbcUrlBuilder.append(paramsBuilder);
369-
}
370-
resourceId = jdbcUrlBuilder.toString();
371-
} else {
372-
resourceId = jdbcUrl;
373-
}
374-
if (resourceId.contains(",")) {
375-
resourceId = resourceId.replace(",", "|");
376-
}
377-
}
378-
379-
/**
380-
* The general form of the connection URL for SqlServer is
381-
* jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;property=value[;property=value]]
382-
* required connection properties: [INSTANCENAME], [databaseName,database]
383-
*
384-
*/
385-
private void initSqlServerResourceId() {
386-
if (jdbcUrl.contains(";")) {
387-
StringBuilder jdbcUrlBuilder = new StringBuilder();
388-
jdbcUrlBuilder.append(jdbcUrl, 0, jdbcUrl.indexOf(';'));
389-
StringBuilder paramsBuilder = new StringBuilder();
390-
String paramUrl = jdbcUrl.substring(jdbcUrl.indexOf(';') + 1);
391-
String[] urlParams = paramUrl.split(";");
392-
for (String urlParam : urlParams) {
393-
String[] paramSplit = urlParam.split("=");
394-
String propertyName = paramSplit[0];
395-
if ("INSTANCENAME".equalsIgnoreCase(propertyName)
396-
|| "databaseName".equalsIgnoreCase(propertyName)
397-
|| "database".equalsIgnoreCase(propertyName)) {
398-
paramsBuilder.append(urlParam);
399-
}
400-
}
401-
402-
if (paramsBuilder.length() > 0) {
403-
jdbcUrlBuilder.append(";");
404-
jdbcUrlBuilder.append(paramsBuilder);
405-
}
406-
resourceId = jdbcUrlBuilder.toString();
407-
} else {
408-
resourceId = jdbcUrl;
409-
}
236+
ResourceIdInitializer initializer = ResourceIdInitializerRegistry.getInitializer(dbType, this);
237+
initializer.initResourceId(this);
410238
}
411239

412240
@Override
@@ -453,6 +281,18 @@ private void validMySQLVersion(Connection connection) {
453281
}
454282
}
455283

284+
public String getJdbcUrl() {
285+
return jdbcUrl;
286+
}
287+
288+
public void setResourceId(String resourceId) {
289+
this.resourceId = resourceId;
290+
}
291+
292+
public String getUserName() {
293+
return userName;
294+
}
295+
456296
public void close() throws Exception {
457297
// TODO: Need to unregister resource from DefaultResourceManager
458298
TableMetaCacheFactory.shutdown(resourceId);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seata.rm.datasource.initializer;
19+
20+
import org.apache.seata.rm.datasource.DataSourceProxy;
21+
22+
public abstract class AbstractResourceIdInitializer implements ResourceIdInitializer {
23+
@Override
24+
public void initResourceId(DataSourceProxy proxy) {
25+
doInitResourceId(proxy);
26+
}
27+
28+
protected abstract void doInitResourceId(DataSourceProxy proxy);
29+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seata.rm.datasource.initializer;
19+
20+
import org.apache.seata.rm.datasource.DataSourceProxy;
21+
22+
public interface ResourceIdInitializer {
23+
String JDBC_URL_SPLIT_CHAR = "?";
24+
25+
boolean supports(String dbType, DataSourceProxy proxy);
26+
27+
void initResourceId(DataSourceProxy proxy);
28+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seata.rm.datasource.initializer;
19+
20+
import org.apache.seata.rm.datasource.DataSourceProxy;
21+
import org.apache.seata.rm.datasource.initializer.db.DMResourceIdInitializer;
22+
import org.apache.seata.rm.datasource.initializer.db.DefaultResourceIdInitializer;
23+
import org.apache.seata.rm.datasource.initializer.db.MysqlResourceIdInitializer;
24+
import org.apache.seata.rm.datasource.initializer.db.OracleResourceIdInitializer;
25+
import org.apache.seata.rm.datasource.initializer.db.OscarResourceIdInitializer;
26+
import org.apache.seata.rm.datasource.initializer.db.PostgresqlResourceIdInitializer;
27+
import org.apache.seata.rm.datasource.initializer.db.SqlServerResourceIdInitializer;
28+
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
32+
public class ResourceIdInitializerRegistry {
33+
private static final List<ResourceIdInitializer> INITIALIZERS = new ArrayList<>();
34+
35+
static {
36+
INITIALIZERS.add(new PostgresqlResourceIdInitializer());
37+
INITIALIZERS.add(new OracleResourceIdInitializer());
38+
INITIALIZERS.add(new MysqlResourceIdInitializer());
39+
INITIALIZERS.add(new SqlServerResourceIdInitializer());
40+
INITIALIZERS.add(new DMResourceIdInitializer());
41+
INITIALIZERS.add(new OscarResourceIdInitializer());
42+
}
43+
44+
public static ResourceIdInitializer getInitializer(String dbType, DataSourceProxy proxy) {
45+
for (ResourceIdInitializer initializer : INITIALIZERS) {
46+
if (initializer.supports(dbType, proxy)) {
47+
return initializer;
48+
}
49+
}
50+
return new DefaultResourceIdInitializer();
51+
}
52+
}

0 commit comments

Comments
 (0)