Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
4afe1a5
Disable 3.0 not supported cases
w41ter Sep 11, 2024
7545a29
Remove random_add_cluster_keys_for_mow from branch-3.0
w41ter Dec 16, 2024
4810ac1
Fix error test in br 3.0 (#365)
wyxxxcat Jan 3, 2025
12d6ec9
Disable some suites in 2.1
w41ter Jan 9, 2025
e8c56cf
Fix version div (#387)
w41ter Jan 9, 2025
b3566c2
fix: reuse restore job should keep the table alias
w41ter Feb 12, 2025
b8a7d6b
[branch-3.0] Fix change table to duplicate for row store related suti…
wyxxxcat Feb 13, 2025
79e652d
Add F/L commitSeq and commitTs for get_lag (#395)
wyxxxcat Jan 22, 2025
4de6762
Add force_replace flag for restore (#396)
wyxxxcat Feb 8, 2025
d189c77
feat: support lock binlogs (#399)
w41ter Feb 8, 2025
dca5f10
feat: support getting binlogs in batch (#400)
w41ter Feb 8, 2025
0e86f3a
fix: lock binlog after handle binlogs (#406)
w41ter Feb 10, 2025
f22ad39
feat: compatible with unknown lock_binlog method (#407)
w41ter Feb 10, 2025
6681261
fix: avoid leaking rpc conns (#435)
w41ter Feb 18, 2025
70688f2
chore: add desync.sh (#452)
w41ter Feb 19, 2025
ae67927
test: add basic failover test (#453)
wyxxxcat Feb 19, 2025
f74f7e6
chore: arrange failover test (#454)
w41ter Feb 19, 2025
99e3aad
fix: db not enable binlog and ignore e for checkSelectTimesOf (#460)
wyxxxcat Feb 21, 2025
31d6003
fix: avoid skipping binlogs when partial sync with partitions (#456)
w41ter Feb 20, 2025
6451981
fix: db not enable binlog and ignore e for checkSelectTimesOf (#457)
wyxxxcat Feb 20, 2025
166fc97
fix: avoid escape escaped \" in comment (#463)
lsy3993 Feb 24, 2025
e7c9702
fix: initial TableMapping before using (#470)
w41ter Feb 27, 2025
7e2659e
fix ds common test (#467)
wyxxxcat Feb 27, 2025
a7be6b6
Merge remote-tracking branch 'origin/dev' into branch-3.0
w41ter Apr 15, 2025
87b4bc7
chore: add seperated handles flag & enable txn insert (#592)
w41ter Apr 15, 2025
bb07dd4
fixup
w41ter Apr 15, 2025
891ba89
fix ci wrong test (#593)
wyxxxcat Apr 15, 2025
0032892
test: fix branch_3_0 wrong ci test #608
wyxxxcat Apr 29, 2025
3ff6995
Merge remote-tracking branch 'origin/dev' into branch-3.0
w41ter May 12, 2025
8be1df5
chore: disable some cases not supported in branch-2.1/3.0
w41ter May 13, 2025
935aa57
fix: apply dropped binlogs early with barrier
w41ter May 13, 2025
997beda
fix: use new name instead of old name, to compatible with 2.1
w41ter May 13, 2025
2d6ffb1
chore: disable some cases not supported in branch-2.1/3.0
w41ter May 13, 2025
925fc5f
disable test_ds_partition_default_list_insert in 3.0
w41ter May 14, 2025
77da4c1
Update CHANGELOG.md
w41ter May 14, 2025
7527ec3
fix: skip create elasticsearch record (#619)
w41ter May 26, 2025
0bcfeac
chore: log shadow index id (#620)
w41ter Jun 5, 2025
eb0ed1f
fix: save shadow indexes even if the binlog is committed (#621)
w41ter Jun 5, 2025
c050ce3
fix: Remove redundant table property #622
wyxxxcat Jun 25, 2025
a836ab1
fix: clear shadow indexes after log committed (#623)
w41ter Jun 25, 2025
3facf09
feat: Add cluster-level CCR sync and dynamic monitor interval configu…
carolinchen-docker Aug 11, 2025
c502de9
feat(ccr): add cluster-level task creation support
carolinchen-docker Aug 11, 2025
4b60d8e
docs: Remove unnecessary documentation files
carolinchen-docker Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 更新日志

# 3.0.5/2.1.10
# 3.0.6/2.1.10

### Fix

Expand All @@ -11,17 +11,20 @@
- 修复 DROP INDEX 导致 UPSERT 找不到 index 的问题 (selectdb/ccr-syncer#490)
- 修复 unknown column 导致的 VIEW 无法创建的问题 (selectdb/ccr-syncer#510)
- 修复 gls 未释放导致的内存泄漏问题 (selectdb/ccr-syncer#576)
- 使用 `force_replace` 修复 VIEW schema 不一致无法同步的问题 (selectdb/ccr-syncer#579)
- 构建 table mapping 前检查 table 是否已经被删除 (selectdb/ccr-syncer#612)

### Feature

- 支持 txn insert (selectdb/ccr-syncer#290)
- 支持 txn insert (selectdb/ccr-syncer#290,selectdb/ccr-syncer#592)
- 支持 lock binlog,提前释放不需要的 binlog,避免占用上游资源 (selectdb/ccr-syncer#399, selectdb/ccr-syncer#406, selectdb/ccr-syncer#407)
- 支持一批获取多个 binlog (selectdb/ccr-syncer#400)
- 增加 `metrics` 接口用于获取 ccr metrics (selectdb/ccr-syncer#402, selectdb/ccr-syncer#461)
- 支持修改 view comment (selectdb/ccr-syncer#408)
- 支持幂等性 (selectdb/ccr-syncer#409, selectdb/ccr-syncer#416, selectdb/ccr-syncer#415, selectdb/ccr-syncer#424, ...)
- 增加 `desync.sh` 脚本 (selectdb/ccr-syncer#452)
- 支持 pipline txn(并行 ingest,串行提交)(selectdb/ccr-syncer#585)
- 增加 `/view` 接口,用于获取 JOB 状态(支持 terminal, table, html)(selectdb/ccr-syncer#588)

### Improve

Expand Down
140 changes: 140 additions & 0 deletions CLUSTER_CCR_CHANGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Cluster CCR Feature Changes

## 概述
本次更改为CCR Syncer添加了集群级别同步功能,允许用户一次性同步整个集群的所有数据库,并提供了动态配置监控间隔的功能。

## 主要更改

### 1. 新增ClusterSync参数

#### 文件:`pkg/service/http_service.go`

**新增字段:**
```go
type CreateCcrRequest struct {
// ... 其他字段
// Whether it's cluster-level sync, if true, will get all databases from source cluster and create sync task for each database
ClusterSync bool `json:"cluster_sync"`
}
```

**新增函数:**
- `createClusterCcr()` - 创建集群级别的CCR同步任务
- `startDatabaseMonitor()` - 启动数据库监控守护进程
- `monitorDatabaseChanges()` - 检测数据库变化并处理
- `handleNewDatabases()` - 处理新增数据库
- `handleDeletedDatabases()` - 处理删除的数据库
- `getDatabaseList()` - 获取数据库列表

**功能特性:**
- 自动获取源集群所有数据库
- 为每个数据库创建独立的同步任务
- 支持动态监控新增/删除的数据库
- 自动创建/删除对应的同步任务
- 支持失败重试机制

### 2. 动态监控间隔配置

#### 新增全局变量:
```go
var (
databaseMonitorInterval time.Duration = 2 * time.Minute
intervalMutex sync.RWMutex
intervalUpdateChan = make(chan time.Duration, 1)
)
```

#### 新增HTTP Handler:

**`/update_monitor_interval` - 更新监控间隔**
- 请求方法:POST
- 请求格式:
```json
{
"interval_seconds": 300
}
```
- 功能:动态更新数据库监控的检查间隔

**`/get_monitor_interval` - 获取当前监控间隔**
- 请求方法:GET
- 响应格式:
```json
{
"success": true,
"interval_seconds": 120
}
```
- 功能:获取当前的监控间隔设置

### 3. 性能优化

**锁优化:**
- 使用channel通信替代频繁的锁操作
- 减少了`startDatabaseMonitor`函数中的锁竞争
- 提高了监控性能

**实现细节:**
- 使用`select`语句监听ticker和interval更新
- 非阻塞的channel通信
- 只在间隔真正改变时才重置ticker

## API使用示例

### 创建集群级同步任务
```bash
curl -X POST http://localhost:9190/create_ccr \
-H "Content-Type: application/json" \
-d '{
"name": "cluster_sync_job",
"src": {
"host": "source-cluster",
"port": 9030,
"user": "root",
"password": "password"
},
"dest": {
"host": "dest-cluster",
"port": 9030,
"user": "root",
"password": "password"
},
"cluster_sync": true
}'
```

### 更新监控间隔
```bash
curl -X POST http://localhost:9190/update_monitor_interval \
-H "Content-Type: application/json" \
-d '{"interval_seconds": 300}'
```

### 获取监控间隔
```bash
curl -X GET http://localhost:9190/get_monitor_interval
```

## 技术特点

1. **线程安全**:使用mutex保护全局变量
2. **高性能**:优化锁使用,减少竞争
3. **动态配置**:支持运行时修改监控间隔
4. **容错性**:支持失败重试和错误处理
5. **可观测性**:详细的日志记录
6. **向后兼容**:不影响现有的单数据库同步功能

## 测试建议

1. 测试集群级同步功能
2. 测试动态监控间隔更新
3. 测试新增/删除数据库的自动处理
4. 测试并发场景下的性能
5. 测试错误恢复机制

## 注意事项

1. 集群级同步会为每个数据库创建独立的同步任务
2. 任务命名格式:`{原任务名}_{数据库名}`
3. 监控间隔最小值应大于0
4. 建议在生产环境中谨慎设置监控间隔
50 changes: 50 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ func ParseBackupState(state string) BackupState {
}
}

// isSystemDatabase 判断是否为系统数据库,需要跳过
func isSystemDatabase(dbName string) bool {
systemDatabases := []string{
"information_schema",
"mysql",
"__internal_schema",
}

for _, sysDb := range systemDatabases {
if dbName == sysDb {
return true
}
}
return false
}

type RestoreState int

const (
Expand Down Expand Up @@ -464,6 +480,40 @@ func (s *Spec) GetAllTables() ([]string, error) {
return tables, nil
}

func (s *Spec) GetAllDatabases() ([]string, error) {
log.Tracef("get all databases from cluster")

db, err := s.Connect()
if err != nil {
return nil, err
}

sql := "SHOW DATABASES"
rows, err := db.Query(sql)
if err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "query %s failed", sql)
}
defer rows.Close()

var databases []string
for rows.Next() {
var database string
if err := rows.Scan(&database); err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed")
}
// 过滤系统数据库
if !isSystemDatabase(database) {
databases = append(databases, database)
}
}

if err := rows.Err(); err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "rows error")
}

return databases, nil
}

func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) {
db, err := s.Connect()
if err != nil {
Expand Down
50 changes: 6 additions & 44 deletions pkg/ccr/handle/alter_job_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,54 +41,16 @@ func (*AlterJobV2Handle) Handle(j *ccr.Job, commitSeq int64, alterJob *record.Al
}

func handleAlterRollup(j *ccr.Job, alterJob *record.AlterJobV2) error {
job_progress := j.GetJobProgress()
j.SaveAlterRollupShadowIndex(alterJob)
if !alterJob.IsFinished() {
switch alterJob.JobState {
case record.ALTER_JOB_STATE_PENDING:
// Once the rollup job step to WAITING_TXN, the upsert to the rollup index is allowed,
// but the dest index of the downstream cluster hasn't been created.
//
// To filter the upsert to the rollup index, save the shadow index ids here.
if job_progress.ShadowIndexes == nil {
job_progress.ShadowIndexes = make(map[int64]int64)
}
job_progress.ShadowIndexes[alterJob.RollupIndexId] = alterJob.BaseIndexId
case record.ALTER_JOB_STATE_CANCELLED:
// clear the shadow indexes
delete(job_progress.ShadowIndexes, alterJob.RollupIndexId)
}
return nil
}

// Once partial snapshot finished, the rollup indexes will be convert to normal index.
delete(job_progress.ShadowIndexes, alterJob.RollupIndexId)

return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, true, false)
}

func handleSchemaChange(j *ccr.Job, alterJob *record.AlterJobV2) error {
job_progress := j.GetJobProgress()
if !alterJob.IsFinished() {
switch alterJob.JobState {
case record.ALTER_JOB_STATE_PENDING:
// Once the schema change step to WAITING_TXN, the upsert to the shadow indexes is allowed,
// but the dest indexes of the downstream cluster hasn't been created.
//
// To filter the upsert to the shadow indexes, save the shadow index ids here.
if job_progress.ShadowIndexes == nil {
job_progress.ShadowIndexes = make(map[int64]int64)
}
for shadowIndexId, originIndexId := range alterJob.ShadowIndexes {
job_progress.ShadowIndexes[shadowIndexId] = originIndexId
}
case record.ALTER_JOB_STATE_CANCELLED:
// clear the shadow indexes
for shadowIndexId := range alterJob.ShadowIndexes {
delete(job_progress.ShadowIndexes, shadowIndexId)
}
}
return nil
}

// drop table dropTableSql
var destTableName string
Expand All @@ -98,12 +60,12 @@ func handleSchemaChange(j *ccr.Job, alterJob *record.AlterJobV2) error {
destTableName = alterJob.TableName
}

if ccr.FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE {
// Once partial snapshot finished, the shadow indexes will be convert to normal indexes.
for shadowIndexId := range alterJob.ShadowIndexes {
delete(job_progress.ShadowIndexes, shadowIndexId)
}
j.SaveSchemaChangeShadowIndexes(alterJob)
if !alterJob.IsFinished() {
return nil
}

if ccr.FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE {
return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, true, false)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ccr/handle/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec
return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType)
}

if createTable.IsCreateElasticSearch() {
log.Warnf("create table with elasticsearch is not supported yet, skip this binlog")
return nil
}

if createTable.IsCreateMaterializedView() {
log.Warnf("create async materialized view is not supported yet, skip this binlog")
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccr/handle/rename_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ func (h *RenamePartitionHandle) IsBinlogCommitted(j *ccr.Job, record *record.Ren
}

for _, partition := range partitions {
if partition.Name == record.OldPartitionName {
log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is not committed",
if partition.Name == record.NewPartitionName {
log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is committed",
record.OldPartitionName, record.NewPartitionName, destTableId)
return false, nil
return true, nil
}
}

log.Infof("partition %s is renamed to %s in dest table %d, this binlog is not committed",
record.OldPartitionName, record.NewPartitionName, destTableId)
return true, nil
return false, nil
}

func (h *RenamePartitionHandle) IsIdempotent() bool {
Expand Down
25 changes: 23 additions & 2 deletions pkg/ccr/ingest_binlog_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64,
srcIndexName := getSrcIndexName(job, srcIndexMeta)
if _, ok := destIndexNameMap[srcIndexName]; !ok {
j.setError(xerror.Errorf(xerror.Meta,
"index name %v not found in dest meta, is base index: %t, src index id: %d",
srcIndexName, srcIndexMeta.IsBaseIndex, indexId))
"index name %v not found in dest meta, is base index: %t, src index id: %d, shadow indexes %v",
srcIndexName, srcIndexMeta.IsBaseIndex, indexId, j.ccrJob.progress.ShadowIndexes))
return
}
}
Expand Down Expand Up @@ -806,6 +806,27 @@ func (j *IngestBinlogJob) applyDroppedBinlogs() {
return
}

if binlog.GetType() == festruct.TBinlogType_BARRIER {
barrierLog, err := record.NewBarrierLogFromJson(binlog.GetData())
if err != nil {
j.setError(err)
return
}

if barrierLog.Binlog != "" {
// keep compatible with old version
binlogType := festruct.TBinlogType(barrierLog.BinlogType)
newBinlog := festruct.NewTBinlog()
newBinlog.SetCommitSeq(utils.ThriftValueWrapper(binlog.GetCommitSeq()))
newBinlog.SetTimestamp(utils.ThriftValueWrapper(binlog.GetTimestamp()))
newBinlog.SetType(&binlogType)
newBinlog.SetDbId(utils.ThriftValueWrapper(binlog.GetDbId()))
newBinlog.SetData(&barrierLog.Binlog)
newBinlog.SetTableIds(binlog.GetTableIds())
binlog = newBinlog
}
}

if binlog.GetType() == festruct.TBinlogType_ALTER_JOB {
log.Infof("txn %d ingest binlog: trigger new partial snapshot by alter job binlog, commitSeq: %d", j.txnId, commitSeq)
alterJobRecord, err := record.NewAlterJobV2FromJson(binlog.GetData())
Expand Down
Loading