diff --git a/CHANGELOG.md b/CHANGELOG.md index 47849a55..c143a3e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # 更新日志 -# 3.0.5/2.1.10 +# 3.0.6/2.1.10 ### Fix @@ -11,10 +11,12 @@ - 修复 DROP INDEX 导致 UPSERT 找不到 index 的问题 (selectdb/ccr-syncer#490) - 修复 unknown column 导致的 VIEW 无法创建的问题 (selectdb/ccr-syncer#510) - 修复 gls 未释放导致的内存泄漏问题 (selectdb/ccr-syncer#576) +- 使用 `force_replace` 修复 VIEW schema 不一致无法同步的问题 (selectdb/ccr-syncer#579) +- 构建 table mapping 前检查 table 是否已经被删除 (selectdb/ccr-syncer#612) ### Feature -- 支持 txn insert (selectdb/ccr-syncer#290) +- 支持 txn insert (selectdb/ccr-syncer#290,selectdb/ccr-syncer#592) - 支持 lock binlog,提前释放不需要的 binlog,避免占用上游资源 (selectdb/ccr-syncer#399, selectdb/ccr-syncer#406, selectdb/ccr-syncer#407) - 支持一批获取多个 binlog (selectdb/ccr-syncer#400) - 增加 `metrics` 接口用于获取 ccr metrics (selectdb/ccr-syncer#402, selectdb/ccr-syncer#461) @@ -22,6 +24,7 @@ - 支持幂等性 (selectdb/ccr-syncer#409, selectdb/ccr-syncer#416, selectdb/ccr-syncer#415, selectdb/ccr-syncer#424, ...) - 增加 `desync.sh` 脚本 (selectdb/ccr-syncer#452) - 支持 pipline txn(并行 ingest,串行提交)(selectdb/ccr-syncer#585) +- 增加 `/view` 接口,用于获取 JOB 状态(支持 terminal, table, html)(selectdb/ccr-syncer#588) ### Improve diff --git a/CLUSTER_CCR_CHANGES.md b/CLUSTER_CCR_CHANGES.md new file mode 100644 index 00000000..5cc1b947 --- /dev/null +++ b/CLUSTER_CCR_CHANGES.md @@ -0,0 +1,140 @@ +# Cluster CCR Feature Changes + +## 概述 +本次更改为CCR Syncer添加了集群级别同步功能,允许用户一次性同步整个集群的所有数据库,并提供了动态配置监控间隔的功能。 + +## 主要更改 + +### 1. 新增ClusterSync参数 + +#### 文件:`pkg/service/http_service.go` + +**新增字段:** +```go +type CreateCcrRequest struct { + // ... 其他字段 + // Whether it's cluster-level sync, if true, will get all databases from source cluster and create sync task for each database + ClusterSync bool `json:"cluster_sync"` +} +``` + +**新增函数:** +- `createClusterCcr()` - 创建集群级别的CCR同步任务 +- `startDatabaseMonitor()` - 启动数据库监控守护进程 +- `monitorDatabaseChanges()` - 检测数据库变化并处理 +- `handleNewDatabases()` - 处理新增数据库 +- `handleDeletedDatabases()` - 处理删除的数据库 +- `getDatabaseList()` - 获取数据库列表 + +**功能特性:** +- 自动获取源集群所有数据库 +- 为每个数据库创建独立的同步任务 +- 支持动态监控新增/删除的数据库 +- 自动创建/删除对应的同步任务 +- 支持失败重试机制 + +### 2. 动态监控间隔配置 + +#### 新增全局变量: +```go +var ( + databaseMonitorInterval time.Duration = 2 * time.Minute + intervalMutex sync.RWMutex + intervalUpdateChan = make(chan time.Duration, 1) +) +``` + +#### 新增HTTP Handler: + +**`/update_monitor_interval` - 更新监控间隔** +- 请求方法:POST +- 请求格式: +```json +{ + "interval_seconds": 300 +} +``` +- 功能:动态更新数据库监控的检查间隔 + +**`/get_monitor_interval` - 获取当前监控间隔** +- 请求方法:GET +- 响应格式: +```json +{ + "success": true, + "interval_seconds": 120 +} +``` +- 功能:获取当前的监控间隔设置 + +### 3. 性能优化 + +**锁优化:** +- 使用channel通信替代频繁的锁操作 +- 减少了`startDatabaseMonitor`函数中的锁竞争 +- 提高了监控性能 + +**实现细节:** +- 使用`select`语句监听ticker和interval更新 +- 非阻塞的channel通信 +- 只在间隔真正改变时才重置ticker + +## API使用示例 + +### 创建集群级同步任务 +```bash +curl -X POST http://localhost:9190/create_ccr \ + -H "Content-Type: application/json" \ + -d '{ + "name": "cluster_sync_job", + "src": { + "host": "source-cluster", + "port": 9030, + "user": "root", + "password": "password" + }, + "dest": { + "host": "dest-cluster", + "port": 9030, + "user": "root", + "password": "password" + }, + "cluster_sync": true + }' +``` + +### 更新监控间隔 +```bash +curl -X POST http://localhost:9190/update_monitor_interval \ + -H "Content-Type: application/json" \ + -d '{"interval_seconds": 300}' +``` + +### 获取监控间隔 +```bash +curl -X GET http://localhost:9190/get_monitor_interval +``` + +## 技术特点 + +1. **线程安全**:使用mutex保护全局变量 +2. **高性能**:优化锁使用,减少竞争 +3. **动态配置**:支持运行时修改监控间隔 +4. **容错性**:支持失败重试和错误处理 +5. **可观测性**:详细的日志记录 +6. **向后兼容**:不影响现有的单数据库同步功能 + +## 测试建议 + +1. 测试集群级同步功能 +2. 测试动态监控间隔更新 +3. 测试新增/删除数据库的自动处理 +4. 测试并发场景下的性能 +5. 测试错误恢复机制 + +## 注意事项 + +1. 集群级同步会为每个数据库创建独立的同步任务 +2. 任务命名格式:`{原任务名}_{数据库名}` +3. 监控间隔最小值应大于0 +4. 建议在生产环境中谨慎设置监控间隔 \ No newline at end of file diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 0eb5f895..29bb1997 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -83,6 +83,22 @@ func ParseBackupState(state string) BackupState { } } +// isSystemDatabase 判断是否为系统数据库,需要跳过 +func isSystemDatabase(dbName string) bool { + systemDatabases := []string{ + "information_schema", + "mysql", + "__internal_schema", + } + + for _, sysDb := range systemDatabases { + if dbName == sysDb { + return true + } + } + return false +} + type RestoreState int const ( @@ -464,6 +480,40 @@ func (s *Spec) GetAllTables() ([]string, error) { return tables, nil } +func (s *Spec) GetAllDatabases() ([]string, error) { + log.Tracef("get all databases from cluster") + + db, err := s.Connect() + if err != nil { + return nil, err + } + + sql := "SHOW DATABASES" + rows, err := db.Query(sql) + if err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "query %s failed", sql) + } + defer rows.Close() + + var databases []string + for rows.Next() { + var database string + if err := rows.Scan(&database); err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed") + } + // 过滤系统数据库 + if !isSystemDatabase(database) { + databases = append(databases, database) + } + } + + if err := rows.Err(); err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "rows error") + } + + return databases, nil +} + func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) { db, err := s.Connect() if err != nil { diff --git a/pkg/ccr/handle/alter_job_v2.go b/pkg/ccr/handle/alter_job_v2.go index df16580d..08519a6c 100644 --- a/pkg/ccr/handle/alter_job_v2.go +++ b/pkg/ccr/handle/alter_job_v2.go @@ -41,54 +41,16 @@ func (*AlterJobV2Handle) Handle(j *ccr.Job, commitSeq int64, alterJob *record.Al } func handleAlterRollup(j *ccr.Job, alterJob *record.AlterJobV2) error { - job_progress := j.GetJobProgress() + j.SaveAlterRollupShadowIndex(alterJob) if !alterJob.IsFinished() { - switch alterJob.JobState { - case record.ALTER_JOB_STATE_PENDING: - // Once the rollup job step to WAITING_TXN, the upsert to the rollup index is allowed, - // but the dest index of the downstream cluster hasn't been created. - // - // To filter the upsert to the rollup index, save the shadow index ids here. - if job_progress.ShadowIndexes == nil { - job_progress.ShadowIndexes = make(map[int64]int64) - } - job_progress.ShadowIndexes[alterJob.RollupIndexId] = alterJob.BaseIndexId - case record.ALTER_JOB_STATE_CANCELLED: - // clear the shadow indexes - delete(job_progress.ShadowIndexes, alterJob.RollupIndexId) - } return nil } - // Once partial snapshot finished, the rollup indexes will be convert to normal index. - delete(job_progress.ShadowIndexes, alterJob.RollupIndexId) - return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, true, false) } func handleSchemaChange(j *ccr.Job, alterJob *record.AlterJobV2) error { job_progress := j.GetJobProgress() - if !alterJob.IsFinished() { - switch alterJob.JobState { - case record.ALTER_JOB_STATE_PENDING: - // Once the schema change step to WAITING_TXN, the upsert to the shadow indexes is allowed, - // but the dest indexes of the downstream cluster hasn't been created. - // - // To filter the upsert to the shadow indexes, save the shadow index ids here. - if job_progress.ShadowIndexes == nil { - job_progress.ShadowIndexes = make(map[int64]int64) - } - for shadowIndexId, originIndexId := range alterJob.ShadowIndexes { - job_progress.ShadowIndexes[shadowIndexId] = originIndexId - } - case record.ALTER_JOB_STATE_CANCELLED: - // clear the shadow indexes - for shadowIndexId := range alterJob.ShadowIndexes { - delete(job_progress.ShadowIndexes, shadowIndexId) - } - } - return nil - } // drop table dropTableSql var destTableName string @@ -98,12 +60,12 @@ func handleSchemaChange(j *ccr.Job, alterJob *record.AlterJobV2) error { destTableName = alterJob.TableName } - if ccr.FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { - // Once partial snapshot finished, the shadow indexes will be convert to normal indexes. - for shadowIndexId := range alterJob.ShadowIndexes { - delete(job_progress.ShadowIndexes, shadowIndexId) - } + j.SaveSchemaChangeShadowIndexes(alterJob) + if !alterJob.IsFinished() { + return nil + } + if ccr.FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, true, false) } diff --git a/pkg/ccr/handle/create_table.go b/pkg/ccr/handle/create_table.go index 9b22b513..260c652e 100644 --- a/pkg/ccr/handle/create_table.go +++ b/pkg/ccr/handle/create_table.go @@ -23,6 +23,11 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType) } + if createTable.IsCreateElasticSearch() { + log.Warnf("create table with elasticsearch is not supported yet, skip this binlog") + return nil + } + if createTable.IsCreateMaterializedView() { log.Warnf("create async materialized view is not supported yet, skip this binlog") return nil diff --git a/pkg/ccr/handle/rename_partition.go b/pkg/ccr/handle/rename_partition.go index 8fb34218..b098068a 100644 --- a/pkg/ccr/handle/rename_partition.go +++ b/pkg/ccr/handle/rename_partition.go @@ -30,16 +30,16 @@ func (h *RenamePartitionHandle) IsBinlogCommitted(j *ccr.Job, record *record.Ren } for _, partition := range partitions { - if partition.Name == record.OldPartitionName { - log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is not committed", + if partition.Name == record.NewPartitionName { + log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is committed", record.OldPartitionName, record.NewPartitionName, destTableId) - return false, nil + return true, nil } } log.Infof("partition %s is renamed to %s in dest table %d, this binlog is not committed", record.OldPartitionName, record.NewPartitionName, destTableId) - return true, nil + return false, nil } func (h *RenamePartitionHandle) IsIdempotent() bool { diff --git a/pkg/ccr/ingest_binlog_job.go b/pkg/ccr/ingest_binlog_job.go index 5f705f5e..972a248f 100644 --- a/pkg/ccr/ingest_binlog_job.go +++ b/pkg/ccr/ingest_binlog_job.go @@ -543,8 +543,8 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, srcIndexName := getSrcIndexName(job, srcIndexMeta) if _, ok := destIndexNameMap[srcIndexName]; !ok { j.setError(xerror.Errorf(xerror.Meta, - "index name %v not found in dest meta, is base index: %t, src index id: %d", - srcIndexName, srcIndexMeta.IsBaseIndex, indexId)) + "index name %v not found in dest meta, is base index: %t, src index id: %d, shadow indexes %v", + srcIndexName, srcIndexMeta.IsBaseIndex, indexId, j.ccrJob.progress.ShadowIndexes)) return } } @@ -806,6 +806,27 @@ func (j *IngestBinlogJob) applyDroppedBinlogs() { return } + if binlog.GetType() == festruct.TBinlogType_BARRIER { + barrierLog, err := record.NewBarrierLogFromJson(binlog.GetData()) + if err != nil { + j.setError(err) + return + } + + if barrierLog.Binlog != "" { + // keep compatible with old version + binlogType := festruct.TBinlogType(barrierLog.BinlogType) + newBinlog := festruct.NewTBinlog() + newBinlog.SetCommitSeq(utils.ThriftValueWrapper(binlog.GetCommitSeq())) + newBinlog.SetTimestamp(utils.ThriftValueWrapper(binlog.GetTimestamp())) + newBinlog.SetType(&binlogType) + newBinlog.SetDbId(utils.ThriftValueWrapper(binlog.GetDbId())) + newBinlog.SetData(&barrierLog.Binlog) + newBinlog.SetTableIds(binlog.GetTableIds()) + binlog = newBinlog + } + } + if binlog.GetType() == festruct.TBinlogType_ALTER_JOB { log.Infof("txn %d ingest binlog: trigger new partial snapshot by alter job binlog, commitSeq: %d", j.txnId, commitSeq) alterJobRecord, err := record.NewAlterJobV2FromJson(binlog.GetData()) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index c1d9a67f..277d4ce8 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -118,7 +118,7 @@ func init() { "skip checking async mv table, the async mv binlogs will be filtered by doris") flag.BoolVar(&featurePipelineCommit, "feature_pipeline_commit", true, "enable pipeline commit for upsert binlogs") - flag.BoolVar(&featureSeperatedHandles, "feature_seperated_handles", true, + flag.BoolVar(&featureSeperatedHandles, "feature_seperated_handles", false, "enable the seperated handles (the refactor)") flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch") @@ -127,8 +127,9 @@ func init() { type SyncType int const ( - DBSync SyncType = 0 - TableSync SyncType = 1 + DBSync SyncType = 0 + TableSync SyncType = 1 + ClusterSync SyncType = 2 ) func (s SyncType) String() string { @@ -137,6 +138,8 @@ func (s SyncType) String() string { return "db_sync" case TableSync: return "table_sync" + case ClusterSync: + return "cluster_sync" default: return "unknown_sync" } @@ -2068,6 +2071,11 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error { return nil } + if createTable.IsCreateElasticSearch() { + log.Warnf("create table with elasticsearch is not supported yet, skip this binlog") + return nil + } + if createTable.IsCreateMaterializedView() { log.Warnf("create async materialized view is not supported yet, skip this binlog") return nil @@ -2279,6 +2287,11 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { } if j.IsBinlogCommitted(alterJob.TableId, binlog.GetCommitSeq()) { + if alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { + j.SaveSchemaChangeShadowIndexes(alterJob) + } else if !FeatureSkipRollupBinlogs && alterJob.Type == record.ALTER_JOB_ROLLUP { + j.SaveAlterRollupShadowIndex(alterJob) + } return nil } @@ -2303,56 +2316,80 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { } } -func (j *Job) HandleAlterRollup(alterJob *record.AlterJobV2) error { - if !alterJob.IsFinished() { - switch alterJob.JobState { - case record.ALTER_JOB_STATE_PENDING: - // Once the rollup job step to WAITING_TXN, the upsert to the rollup index is allowed, - // but the dest index of the downstream cluster hasn't been created. - // - // To filter the upsert to the rollup index, save the shadow index ids here. - if j.progress.ShadowIndexes == nil { - j.progress.ShadowIndexes = make(map[int64]int64) - } +func (j *Job) SaveAlterRollupShadowIndex(alterJob *record.AlterJobV2) { + switch alterJob.JobState { + case record.ALTER_JOB_STATE_PENDING: + fallthrough + case record.ALTER_JOB_STATE_WAITING_TXN: + fallthrough + case record.ALTER_JOB_STATE_RUNNING: + // Once the rollup job step to WAITING_TXN, the upsert to the rollup index is allowed, + // but the dest index of the downstream cluster hasn't been created. + // + // To filter the upsert to the rollup index, save the shadow index ids here. + if j.progress.ShadowIndexes == nil { + j.progress.ShadowIndexes = make(map[int64]int64) + } + if _, ok := j.progress.ShadowIndexes[alterJob.RollupIndexId]; !ok { j.progress.ShadowIndexes[alterJob.RollupIndexId] = alterJob.BaseIndexId - case record.ALTER_JOB_STATE_CANCELLED: - // clear the shadow indexes - delete(j.progress.ShadowIndexes, alterJob.RollupIndexId) + log.Infof("table %d alter rollup save shadow index %d, base index id: %d", + alterJob.TableId, alterJob.RollupIndexId, alterJob.BaseIndexId) } - return nil + case record.ALTER_JOB_STATE_CANCELLED: + // clear the shadow indexes + delete(j.progress.ShadowIndexes, alterJob.RollupIndexId) + case record.ALTER_JOB_STATE_FINISHED: + // Once partial snapshot finished, the rollup indexes will be convert to normal index. + delete(j.progress.ShadowIndexes, alterJob.RollupIndexId) } +} - // Once partial snapshot finished, the rollup indexes will be convert to normal index. - delete(j.progress.ShadowIndexes, alterJob.RollupIndexId) +func (j *Job) HandleAlterRollup(alterJob *record.AlterJobV2) error { + j.SaveAlterRollupShadowIndex(alterJob) + if !alterJob.IsFinished() { + return nil + } replace := true isView := false return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replace, isView) } -func (j *Job) HandleSchemaChange(alterJob *record.AlterJobV2) error { - if !alterJob.IsFinished() { - switch alterJob.JobState { - case record.ALTER_JOB_STATE_PENDING: - // Once the schema change step to WAITING_TXN, the upsert to the shadow indexes is allowed, - // but the dest indexes of the downstream cluster hasn't been created. - // - // To filter the upsert to the shadow indexes, save the shadow index ids here. - if j.progress.ShadowIndexes == nil { - j.progress.ShadowIndexes = make(map[int64]int64) - } - for shadowIndexId, originIndexId := range alterJob.ShadowIndexes { +func (j *Job) SaveSchemaChangeShadowIndexes(alterJob *record.AlterJobV2) { + switch alterJob.JobState { + case record.ALTER_JOB_STATE_PENDING: + fallthrough + case record.ALTER_JOB_STATE_WAITING_TXN: + fallthrough + case record.ALTER_JOB_STATE_RUNNING: + // Once the schema change step to WAITING_TXN, the upsert to the shadow indexes is allowed, + // but the dest indexes of the downstream cluster hasn't been created. + // + // To filter the upsert to the shadow indexes, save the shadow index ids here. + if j.progress.ShadowIndexes == nil { + j.progress.ShadowIndexes = make(map[int64]int64) + } + for shadowIndexId, originIndexId := range alterJob.ShadowIndexes { + if _, ok := j.progress.ShadowIndexes[shadowIndexId]; !ok { j.progress.ShadowIndexes[shadowIndexId] = originIndexId - } - case record.ALTER_JOB_STATE_CANCELLED: - // clear the shadow indexes - for shadowIndexId := range alterJob.ShadowIndexes { - delete(j.progress.ShadowIndexes, shadowIndexId) + log.Infof("table %d schema change job save shadow index %d, origin index id: %d", + alterJob.TableId, shadowIndexId, originIndexId) } } - return nil + case record.ALTER_JOB_STATE_CANCELLED: + // clear the shadow indexes + for shadowIndexId := range alterJob.ShadowIndexes { + delete(j.progress.ShadowIndexes, shadowIndexId) + } + case record.ALTER_JOB_STATE_FINISHED: + // Once partial snapshot finished, the shadow indexes will be convert to normal indexes. + for shadowIndexId := range alterJob.ShadowIndexes { + delete(j.progress.ShadowIndexes, shadowIndexId) + } } +} +func (j *Job) HandleSchemaChange(alterJob *record.AlterJobV2) error { // drop table dropTableSql var destTableName string if j.SyncType == TableSync { @@ -2361,12 +2398,12 @@ func (j *Job) HandleSchemaChange(alterJob *record.AlterJobV2) error { destTableName = alterJob.TableName } - if FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { - // Once partial snapshot finished, the shadow indexes will be convert to normal indexes. - for shadowIndexId := range alterJob.ShadowIndexes { - delete(j.progress.ShadowIndexes, shadowIndexId) - } + j.SaveSchemaChangeShadowIndexes(alterJob) + if !alterJob.IsFinished() { + return nil + } + if FeatureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { replaceTable := true isView := false return j.NewPartialSnapshot(alterJob.TableId, alterJob.TableName, nil, replaceTable, isView) @@ -3257,16 +3294,16 @@ func (j *Job) isRenamePartitionCommitted(record *record.RenamePartition) (bool, } for _, partition := range partitions { - if partition.Name == record.OldPartitionName { - log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is not committed", + if partition.Name == record.NewPartitionName { + log.Infof("partition %s is not renamed to %s in dest table %d, this binlog is committed", record.OldPartitionName, record.NewPartitionName, destTableId) - return false, nil + return true, nil } } log.Infof("partition %s is renamed to %s in dest table %d, this binlog is not committed", record.OldPartitionName, record.NewPartitionName, destTableId) - return true, nil + return false, nil } func (j *Job) isModifyDistributionTypeCommitted(r *record.ModifyDistributionType) (bool, error) { diff --git a/pkg/ccr/job_handle.go b/pkg/ccr/job_handle.go index 4ceaa9b1..f96327ab 100644 --- a/pkg/ccr/job_handle.go +++ b/pkg/ccr/job_handle.go @@ -53,18 +53,26 @@ func buildGenericHandleMethod[T record.Record](handle JobHandle[T]) HandleFn { binlog.GetType(), progress.PrevCommitSeq, progress.CommitSeq) data := binlog.GetData() - record := newGenericRecord[T]() - if err := record.Deserialize(data); err != nil { + value := newGenericRecord[T]() + if err := value.Deserialize(data); err != nil { return err } - tableId := record.GetTableId() + tableId := value.GetTableId() if job.IsBinlogCommitted(tableId, progress.CommitSeq) { + // HACK: for alter job, should save the shadow indexes + if alterJob, ok := any(value).(*record.AlterJobV2); ok { + if !FeatureSkipRollupBinlogs && alterJob.Type == record.ALTER_JOB_ROLLUP { + job.SaveAlterRollupShadowIndex(alterJob) + } else if alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { + job.SaveSchemaChangeShadowIndexes(alterJob) + } + } return nil } commitSeq := binlog.GetCommitSeq() - return handle.Handle(job, commitSeq, record) + return handle.Handle(job, commitSeq, value) } } diff --git a/pkg/ccr/record/create_table.go b/pkg/ccr/record/create_table.go index 0f077b87..fad00cf3 100644 --- a/pkg/ccr/record/create_table.go +++ b/pkg/ccr/record/create_table.go @@ -93,3 +93,7 @@ func (c *CreateTable) IsCreateMaterializedView() bool { return strings.Contains(c.Sql, "ENGINE=MATERIALIZED_VIEW") } + +func (c *CreateTable) IsCreateElasticSearch() bool { + return c.TableType == TableTypeElasticSearch +} diff --git a/pkg/ccr/record/table_type.go b/pkg/ccr/record/table_type.go index d400f19b..b18a0f5b 100644 --- a/pkg/ccr/record/table_type.go +++ b/pkg/ccr/record/table_type.go @@ -20,4 +20,5 @@ const ( TableTypeOlap = "OLAP" TableTypeView = "VIEW" TableTypeMaterializedView = "MATERIALIZED_VIEW" + TableTypeElasticSearch = "ELASTICSEARCH" ) diff --git a/pkg/ccr/specer_mock.go b/pkg/ccr/specer_mock.go index cbf278b7..4b6b51fa 100644 --- a/pkg/ccr/specer_mock.go +++ b/pkg/ccr/specer_mock.go @@ -216,6 +216,21 @@ func (mr *MockSpecerMockRecorder) Exec(sql any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exec", reflect.TypeOf((*MockSpecer)(nil).Exec), sql) } +// GetAllTables mocks base method. +func (m *MockSpecer) GetAllDatabases() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllDatabases") + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllDatabases indicates an expected call of GetAllDatabases. +func (mr *MockSpecerMockRecorder) GetAllDatabases() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllDatabases", reflect.TypeOf((*MockSpecer)(nil).GetAllDatabases)) +} + // GetAllTables mocks base method. func (m *MockSpecer) GetAllTables() ([]string, error) { m.ctrl.T.Helper() diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index 58c65314..8ab5b9b3 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -25,6 +25,7 @@ import ( "reflect" "strconv" "strings" + "sync" "time" "github.com/selectdb/ccr_syncer/pkg/ccr" @@ -40,6 +41,13 @@ import ( log "github.com/sirupsen/logrus" ) +// Global variable to store database monitor check interval +var ( + databaseMonitorInterval time.Duration = 2 * time.Minute + intervalMutex sync.RWMutex + intervalUpdateChan = make(chan time.Duration, 1) +) + // TODO(Drogon): impl a generic http request handle parse json func writeJson(w http.ResponseWriter, data interface{}) { @@ -103,6 +111,8 @@ type CreateCcrRequest struct { // For table sync, allow to create ccr job even if the target table already exists. AllowTableExists bool `json:"allow_table_exists"` ReuseBinlogLabel bool `json:"reuse_binlog_label"` + // Whether it's cluster-level sync, if true, will get all databases from source cluster and create sync task for each database + ClusterSync bool `json:"cluster_sync"` } // Stringer @@ -156,6 +166,250 @@ func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobMana return nil } +// createClusterCcr creates cluster-level CCR synchronization tasks +// Gets all databases from the source cluster and creates a sync task for each database +func createClusterCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error { + log.Infof("create cluster ccr %s", request) + + // Get all database list from source cluster + databases, err := getDatabaseList(&request.Src) + if err != nil { + return xerror.Wrapf(err, xerror.Normal, "Failed to get database list from source cluster") + } + + if len(databases) == 0 { + return xerror.Errorf(xerror.Normal, "No databases found in source cluster") + } + + log.Infof("Found %d databases, starting to create cluster-level sync tasks: %v", len(databases), databases) + + var errors []string + successCount := 0 + + for _, dbName := range databases { + // Create a new request for each database + dbRequest := &CreateCcrRequest{ + Name: fmt.Sprintf("%s_%s", request.Name, dbName), // Task name with database name appended + Src: request.Src, + Dest: request.Dest, + SkipError: request.SkipError, + AllowTableExists: request.AllowTableExists, + ReuseBinlogLabel: request.ReuseBinlogLabel, + ClusterSync: false, // Set to false to avoid recursive calls + } + + dbRequest.Src.Database = dbName + dbRequest.Dest.Database = dbName + + if err := createCcr(dbRequest, db, jobManager); err != nil { + errMsg := fmt.Sprintf("Failed to create sync task for database %s: %v", dbName, err) + log.Warnf(errMsg) + errors = append(errors, errMsg) + } else { + successCount++ + log.Infof("Successfully created sync task for database %s", dbName) + } + } + + if len(errors) > 0 { + if successCount == 0 { + return xerror.Errorf(xerror.Normal, "All database sync tasks creation failed: %s", strings.Join(errors, "; ")) + } else { + log.Warnf("Partial cluster sync tasks creation failed, success: %d, failed: %d, errors: %s", + successCount, len(errors), strings.Join(errors, "; ")) + } + } + + log.Infof("Cluster-level sync tasks creation completed, success: %d, failed: %d", successCount, len(errors)) + + // Start daemon task to periodically detect new databases in source cluster, passing existing database list + go startDatabaseMonitor(request, db, jobManager, databases) + + return nil +} + +// startDatabaseMonitor starts a daemon task to periodically detect new and deleted databases in source cluster, and create or delete corresponding sync tasks +func startDatabaseMonitor(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager, initialDatabases []string) { + log.Infof("Starting database monitor daemon, task name prefix: %s", request.Name) + + existingDatabases := initializeDatabaseTracking(initialDatabases) + log.Infof("Initialized database monitoring, currently have %d databases", len(existingDatabases)) + + intervalMutex.RLock() + currentInterval := databaseMonitorInterval + intervalMutex.RUnlock() + + ticker := time.NewTicker(currentInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + monitorDatabaseChanges(request, db, jobManager, existingDatabases) + case newInterval := <-intervalUpdateChan: + if newInterval != currentInterval { + log.Infof("Database monitor interval changed from %v to %v", currentInterval, newInterval) + ticker.Stop() + ticker = time.NewTicker(newInterval) + currentInterval = newInterval + } + } + } +} + +func initializeDatabaseTracking(initialDatabases []string) map[string]bool { + existingDatabases := make(map[string]bool) + for _, dbName := range initialDatabases { + existingDatabases[dbName] = true + } + return existingDatabases +} + +// monitorDatabaseChanges detects database changes and handles them +func monitorDatabaseChanges(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager, existingDatabases map[string]bool) { + currentDatabases, err := request.Src.GetAllDatabases() + if err != nil { + log.Errorf("Failed to get database list: %v", err) + return + } + + currentDatabaseMap := make(map[string]bool) + for _, dbName := range currentDatabases { + if dbName == "" { + continue + } + currentDatabaseMap[dbName] = true + } + + newDatabases := identifyNewDatabases(currentDatabases, existingDatabases) + deletedDatabases := identifyDeletedDatabases(existingDatabases, currentDatabaseMap) + + handleNewDatabases(newDatabases, request, db, jobManager) + handleDeletedDatabases(deletedDatabases, request, jobManager) +} + +func identifyNewDatabases(currentDatabases []string, existingDatabases map[string]bool) []string { + var newDatabases []string + for _, dbName := range currentDatabases { + if !existingDatabases[dbName] { + newDatabases = append(newDatabases, dbName) + existingDatabases[dbName] = true + } + } + return newDatabases +} + +func identifyDeletedDatabases(existingDatabases map[string]bool, currentDatabaseMap map[string]bool) []string { + var deletedDatabases []string + for dbName := range existingDatabases { + if !currentDatabaseMap[dbName] { + deletedDatabases = append(deletedDatabases, dbName) + delete(existingDatabases, dbName) + } + } + return deletedDatabases +} + +func handleNewDatabases(newDatabases []string, request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) { + if len(newDatabases) == 0 { + return + } + + log.Infof("Found %d new databases: %v", len(newDatabases), newDatabases) + + for _, dbName := range newDatabases { + if dbName == "" { + log.Warnf("Skipping empty database name") + continue + } + + jobName := fmt.Sprintf("%s_%s", request.Name, dbName) + jobExists, err := db.IsJobExist(jobName) + if err != nil { + log.Warnf("Error checking if job %s exists: %v", jobName, err) + continue + } + + if jobExists { + log.Warnf("Job %s already exists, skipping sync task creation for database %s", jobName, dbName) + continue + } + + dbRequest := &CreateCcrRequest{ + Name: jobName, + Src: request.Src, + Dest: request.Dest, + SkipError: request.SkipError, + AllowTableExists: request.AllowTableExists, + ReuseBinlogLabel: request.ReuseBinlogLabel, + ClusterSync: false, // Set to false to avoid recursive calls + } + + dbRequest.Src.Database = dbName + dbRequest.Dest.Database = dbName + + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + if err := createCcr(dbRequest, db, jobManager); err != nil { + if i == maxRetries-1 { + log.Warnf("Failed to create sync task for new database %s (attempt %d/%d): %v", dbName, i+1, maxRetries, err) + } else { + log.Warnf("Failed to create sync task for new database %s (attempt %d/%d): %v, will retry", dbName, i+1, maxRetries, err) + time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff + } + } else { + log.Infof("Successfully created sync task for new database %s", dbName) + break + } + } + } +} + +func handleDeletedDatabases(deletedDatabases []string, request *CreateCcrRequest, jobManager *ccr.JobManager) { + if len(deletedDatabases) == 0 { + return + } + + log.Infof("Found %d deleted databases: %v", len(deletedDatabases), deletedDatabases) + + for _, dbName := range deletedDatabases { + if dbName == "" { + log.Warnf("Skipping empty database name") + continue + } + + jobName := fmt.Sprintf("%s_%s", request.Name, dbName) + + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + if err := jobManager.RemoveJob(jobName); err != nil { + if i == maxRetries-1 { + log.Warnf("Failed to remove sync task for deleted database %s (attempt %d/%d): %v", dbName, i+1, maxRetries, err) + } else { + log.Warnf("Failed to remove sync task for deleted database %s (attempt %d/%d): %v, will retry", dbName, i+1, maxRetries, err) + time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff + } + } else { + log.Infof("Successfully removed sync task for deleted database %s", dbName) + break + } + } + } +} + +func getDatabaseList(spec *base.Spec) ([]string, error) { + log.Infof("Getting database list for cluster %s", spec.Host) + + // Use Specer interface's GetAllDatabases method + databases, err := spec.GetAllDatabases() + if err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "Failed to get database list") + } + + log.Infof("Got %d user databases: %v", len(databases), databases) + return databases, nil +} + // return exit(bool) func (s *HttpService) redirect(jobName string, w http.ResponseWriter, r *http.Request) bool { if jobExist, err := s.db.IsJobExist(jobName); err != nil { @@ -205,12 +459,20 @@ func (s *HttpService) createHandler(w http.ResponseWriter, r *http.Request) { return } - // Call the createCcr function to create the CCR - if err = createCcr(&request, s.db, s.jobManager); err != nil { - log.Warnf("create ccr failed: %+v", err) - createResult = newErrorResult(err.Error()) + if request.ClusterSync { + if err = createClusterCcr(&request, s.db, s.jobManager); err != nil { + log.Warnf("create cluster ccr failed: %+v", err) + createResult = newErrorResult(err.Error()) + } else { + createResult = newSuccessResult() + } } else { - createResult = newSuccessResult() + if err = createCcr(&request, s.db, s.jobManager); err != nil { + log.Warnf("create ccr failed: %+v", err) + createResult = newErrorResult(err.Error()) + } else { + createResult = newSuccessResult() + } } } @@ -1084,6 +1346,67 @@ func (s *HttpService) failpointHandler(w http.ResponseWriter, r *http.Request) { result = newSuccessResult() } +func (s *HttpService) updateMonitorIntervalHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update database monitor interval") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request struct { + IntervalSeconds int `json:"interval_seconds"` + } + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update monitor interval failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.IntervalSeconds <= 0 { + log.Warnf("update monitor interval failed: interval_seconds must be positive") + result = newErrorResult("interval_seconds must be positive") + return + } + + newInterval := time.Duration(request.IntervalSeconds) * time.Second + + intervalMutex.Lock() + oldInterval := databaseMonitorInterval + databaseMonitorInterval = newInterval + intervalMutex.Unlock() + + // Send update notification through channel (non-blocking) + select { + case intervalUpdateChan <- newInterval: + default: + // Channel is full, but that's okay since we only need the latest value + } + + log.Infof("Database monitor interval updated from %v to %v", oldInterval, newInterval) + result = newSuccessResult() +} + +func (s *HttpService) getMonitorIntervalHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("get database monitor interval") + + type result struct { + *defaultResult + IntervalSeconds int `json:"interval_seconds"` + } + + intervalMutex.RLock() + currentInterval := databaseMonitorInterval + intervalMutex.RUnlock() + + intervalResult := &result{ + defaultResult: newSuccessResult(), + IntervalSeconds: int(currentInterval.Seconds()), + } + + writeJson(w, intervalResult) +} + func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/version", s.versionHandler) s.mux.HandleFunc("/create_ccr", s.createHandler) @@ -1101,6 +1424,8 @@ func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/update_host_mapping", s.updateHostMappingHandler) s.mux.HandleFunc("/job_skip_binlog", s.skipBinlogHandler) s.mux.HandleFunc("/failpoint", s.failpointHandler) + s.mux.HandleFunc("/update_monitor_interval", s.updateMonitorIntervalHandler) + s.mux.HandleFunc("/get_monitor_interval", s.getMonitorIntervalHandler) s.mux.Handle("/metrics", xmetrics.GetHttpHandler()) s.mux.HandleFunc("/sync", s.syncHandler) s.mux.HandleFunc("/view", s.showJobStateHandler) diff --git a/regression-test/suites/cross_ds/upsert_index/test_cds_upsert_index.groovy b/regression-test/suites/cross_ds/upsert_index/test_cds_upsert_index.groovy index 5cf3394a..fc7103be 100644 --- a/regression-test/suites/cross_ds/upsert_index/test_cds_upsert_index.groovy +++ b/regression-test/suites/cross_ds/upsert_index/test_cds_upsert_index.groovy @@ -18,6 +18,12 @@ suite("test_cds_upsert_index") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30006, 20111, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def tableName = "tbl_" + helper.randomSuffix() def exist = { res -> Boolean diff --git a/regression-test/suites/cross_ds/view/alter_view/test_cds_view_alter_view.groovy b/regression-test/suites/cross_ds/view/alter_view/test_cds_view_alter_view.groovy index d918beaf..e0d7a453 100644 --- a/regression-test/suites/cross_ds/view/alter_view/test_cds_view_alter_view.groovy +++ b/regression-test/suites/cross_ds/view/alter_view/test_cds_view_alter_view.groovy @@ -19,6 +19,12 @@ suite("test_cds_view_alter_view") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30006, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def suffix = helper.randomSuffix() def tableName = 'tbl_' + suffix @@ -51,6 +57,7 @@ suite("test_cds_view_alter_view") { DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", + "light_schema_change" = "true", "binlog.enable" = "true", "binlog.ttl_seconds" = "180" ) diff --git a/regression-test/suites/db_sync/alt_prop/bucket/test_ds_alt_prop_bucket.groovy b/regression-test/suites/db_sync/alt_prop/bucket/test_ds_alt_prop_bucket.groovy index 0528a03a..6cc28a5b 100644 --- a/regression-test/suites/db_sync/alt_prop/bucket/test_ds_alt_prop_bucket.groovy +++ b/regression-test/suites/db_sync/alt_prop/bucket/test_ds_alt_prop_bucket.groovy @@ -19,6 +19,12 @@ suite("test_ds_alt_prop_bucket") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/db_sync/alt_prop/distribution_num/test_ds_alt_prop_distr_num.groovy b/regression-test/suites/db_sync/alt_prop/distribution_num/test_ds_alt_prop_distr_num.groovy index 1126c88e..4ed56d4d 100644 --- a/regression-test/suites/db_sync/alt_prop/distribution_num/test_ds_alt_prop_distr_num.groovy +++ b/regression-test/suites/db_sync/alt_prop/distribution_num/test_ds_alt_prop_distr_num.groovy @@ -18,6 +18,12 @@ suite("test_ds_alt_prop_distr_num") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/db_sync/alt_prop/distribution_type/test_ds_alt_prop_distr_type.groovy b/regression-test/suites/db_sync/alt_prop/distribution_type/test_ds_alt_prop_distr_type.groovy index c4c3ad4e..2426a373 100644 --- a/regression-test/suites/db_sync/alt_prop/distribution_type/test_ds_alt_prop_distr_type.groovy +++ b/regression-test/suites/db_sync/alt_prop/distribution_type/test_ds_alt_prop_distr_type.groovy @@ -19,6 +19,12 @@ suite("test_ds_alt_prop_distr_type") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/db_sync/partition/default/test_ds_partition_default_list_insert.groovy b/regression-test/suites/db_sync/partition/default/test_ds_partition_default_list_insert.groovy index 4de7a2b1..3e396b58 100644 --- a/regression-test/suites/db_sync/partition/default/test_ds_partition_default_list_insert.groovy +++ b/regression-test/suites/db_sync/partition/default/test_ds_partition_default_list_insert.groovy @@ -19,6 +19,12 @@ suite("test_ds_partition_default_list_insert") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30007])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() sql "DROP TABLE IF EXISTS ${dbName}.${tableName}" diff --git a/regression-test/suites/db_sync/prop/generated_column/test_ds_prop_generated_column.groovy b/regression-test/suites/db_sync/prop/generated_column/test_ds_prop_generated_column.groovy index 973839be..ce6b75db 100644 --- a/regression-test/suites/db_sync/prop/generated_column/test_ds_prop_generated_column.groovy +++ b/regression-test/suites/db_sync/prop/generated_column/test_ds_prop_generated_column.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_generated_column") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/db_sync/prop/unique_key_mow/test_ds_prop_unique_key_mow.groovy b/regression-test/suites/db_sync/prop/unique_key_mow/test_ds_prop_unique_key_mow.groovy index a965b790..7f9ab0bc 100644 --- a/regression-test/suites/db_sync/prop/unique_key_mow/test_ds_prop_unique_key_mow.groovy +++ b/regression-test/suites/db_sync/prop/unique_key_mow/test_ds_prop_unique_key_mow.groovy @@ -19,6 +19,13 @@ suite("test_ds_prop_unique_key_mow") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + log.info("branch-3.0: not support unique key merge on write for property 'enable_unique_key_skip_bitmap_column'") + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def test_num = 0 diff --git a/regression-test/suites/db_sync/prop/variant_nested/test_ds_prop_variant_nested.groovy b/regression-test/suites/db_sync/prop/variant_nested/test_ds_prop_variant_nested.groovy index 2972395a..6cf9e5f3 100644 --- a/regression-test/suites/db_sync/prop/variant_nested/test_ds_prop_variant_nested.groovy +++ b/regression-test/suites/db_sync/prop/variant_nested/test_ds_prop_variant_nested.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_variant_nested") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def test_num = 0 diff --git a/regression-test/suites/db_sync/prop_incrsync/generated_column/test_ds_prop_incrsync_generated_column.groovy b/regression-test/suites/db_sync/prop_incrsync/generated_column/test_ds_prop_incrsync_generated_column.groovy index cb645a36..4152b4ff 100644 --- a/regression-test/suites/db_sync/prop_incrsync/generated_column/test_ds_prop_incrsync_generated_column.groovy +++ b/regression-test/suites/db_sync/prop_incrsync/generated_column/test_ds_prop_incrsync_generated_column.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_incrsync_incsync_generated_column") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableNameFull = "tbl_full" def tableNameIncrement = "tbl_incr" diff --git a/regression-test/suites/db_sync/prop_incrsync/row_store/test_ds_prop_incrsync_row_store.groovy b/regression-test/suites/db_sync/prop_incrsync/row_store/test_ds_prop_incrsync_row_store.groovy index 160abe46..e1f3b172 100644 --- a/regression-test/suites/db_sync/prop_incrsync/row_store/test_ds_prop_incrsync_row_store.groovy +++ b/regression-test/suites/db_sync/prop_incrsync/row_store/test_ds_prop_incrsync_row_store.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_incrsync_incsync_row_store") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableNameFull = "tbl_full" def tableNameIncrement = "tbl_incr" diff --git a/regression-test/suites/db_sync/prop_incrsync/variant_nested/test_ds_prop_incrsync_variant_nested.groovy b/regression-test/suites/db_sync/prop_incrsync/variant_nested/test_ds_prop_incrsync_variant_nested.groovy index d7a3f5b6..e2ade88d 100644 --- a/regression-test/suites/db_sync/prop_incrsync/variant_nested/test_ds_prop_incrsync_variant_nested.groovy +++ b/regression-test/suites/db_sync/prop_incrsync/variant_nested/test_ds_prop_incrsync_variant_nested.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_incrsync_incsync_variant_nested") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableNameFull = "tbl_full" def tableNameIncrement = "tbl_incr" diff --git a/regression-test/suites/db_sync/txn_insert/test_ds_txn_insert.groovy b/regression-test/suites/db_sync/txn_insert/test_ds_txn_insert.groovy index 3bbca590..6cf89810 100644 --- a/regression-test/suites/db_sync/txn_insert/test_ds_txn_insert.groovy +++ b/regression-test/suites/db_sync/txn_insert/test_ds_txn_insert.groovy @@ -19,6 +19,12 @@ suite("test_txn_insert_db") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30006, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + if (!helper.has_feature("feature_txn_insert")) { logger.info("Skip the test because the feature is not supported.") return diff --git a/regression-test/suites/db_sync/view/modify_comment/test_ds_view_modify_comment.groovy b/regression-test/suites/db_sync/view/modify_comment/test_ds_view_modify_comment.groovy index 486653f4..877696cd 100644 --- a/regression-test/suites/db_sync/view/modify_comment/test_ds_view_modify_comment.groovy +++ b/regression-test/suites/db_sync/view/modify_comment/test_ds_view_modify_comment.groovy @@ -19,6 +19,12 @@ suite("test_ds_view_modify_comment") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def viewName = "test_ds_view_modify_comment_view" def exist = { res -> Boolean diff --git a/regression-test/suites/db_sync_idem/distribution_num/test_ds_idem_distribution_num.groovy b/regression-test/suites/db_sync_idem/distribution_num/test_ds_idem_distribution_num.groovy index 6a0b255a..f47e5dbd 100644 --- a/regression-test/suites/db_sync_idem/distribution_num/test_ds_idem_distribution_num.groovy +++ b/regression-test/suites/db_sync_idem/distribution_num/test_ds_idem_distribution_num.groovy @@ -19,6 +19,12 @@ suite('test_ds_idem_distribution_num') { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", 'helper.groovy')) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + if (!helper.has_feature("feature_idempotent_ddl")) { logger.info("this case only works with feature_idempotent_ddl") return diff --git a/regression-test/suites/db_sync_idem/modify_comment/test_ds_idem_modify_comment.groovy b/regression-test/suites/db_sync_idem/modify_comment/test_ds_idem_modify_comment.groovy index e0eb07c2..9bac8466 100644 --- a/regression-test/suites/db_sync_idem/modify_comment/test_ds_idem_modify_comment.groovy +++ b/regression-test/suites/db_sync_idem/modify_comment/test_ds_idem_modify_comment.groovy @@ -19,7 +19,7 @@ suite("test_ds_idem_modify_comment") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) - if (!helper.is_version_supported([20108, 20017, 30004])) { + if (!helper.is_version_supported([20199, 20099, 30099])) { def version = helper.upstream_version() logger.info("Skip the test case because the version is not supported. current version ${version}") } diff --git a/regression-test/suites/db_sync_idem/modify_view/test_ds_idem_modify_view_def.groovy b/regression-test/suites/db_sync_idem/modify_view/test_ds_idem_modify_view_def.groovy index 7a32c51d..b7fd4f11 100644 --- a/regression-test/suites/db_sync_idem/modify_view/test_ds_idem_modify_view_def.groovy +++ b/regression-test/suites/db_sync_idem/modify_view/test_ds_idem_modify_view_def.groovy @@ -19,6 +19,12 @@ suite("test_ds_idem_modify_view_def") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def viewName = "test_ds_idem_modify_view_def_view" def exist = { res -> Boolean diff --git a/regression-test/suites/table_sync/alt_prop/bucket/test_ts_alt_prop_bucket.groovy b/regression-test/suites/table_sync/alt_prop/bucket/test_ts_alt_prop_bucket.groovy index c5b82540..9792c21d 100644 --- a/regression-test/suites/table_sync/alt_prop/bucket/test_ts_alt_prop_bucket.groovy +++ b/regression-test/suites/table_sync/alt_prop/bucket/test_ts_alt_prop_bucket.groovy @@ -19,6 +19,12 @@ suite("test_ts_alt_prop_bucket") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync/alt_prop/distribution_num/test_ts_alt_prop_distr_num.groovy b/regression-test/suites/table_sync/alt_prop/distribution_num/test_ts_alt_prop_distr_num.groovy index 70f634e6..f8df53ad 100644 --- a/regression-test/suites/table_sync/alt_prop/distribution_num/test_ts_alt_prop_distr_num.groovy +++ b/regression-test/suites/table_sync/alt_prop/distribution_num/test_ts_alt_prop_distr_num.groovy @@ -19,6 +19,12 @@ suite("test_ts_alt_prop_distr_num") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync/alt_prop/distribution_type/test_ts_alt_prop_distr_type.groovy b/regression-test/suites/table_sync/alt_prop/distribution_type/test_ts_alt_prop_distr_type.groovy index a34e9019..e8f8f4b1 100644 --- a/regression-test/suites/table_sync/alt_prop/distribution_type/test_ts_alt_prop_distr_type.groovy +++ b/regression-test/suites/table_sync/alt_prop/distribution_type/test_ts_alt_prop_distr_type.groovy @@ -19,6 +19,12 @@ suite("test_ts_alt_prop_distr_type") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync/prop/generated_column/test_ts_prop_generated_column.groovy b/regression-test/suites/table_sync/prop/generated_column/test_ts_prop_generated_column.groovy index d16c200a..fd836363 100644 --- a/regression-test/suites/table_sync/prop/generated_column/test_ts_prop_generated_column.groovy +++ b/regression-test/suites/table_sync/prop/generated_column/test_ts_prop_generated_column.groovy @@ -19,6 +19,12 @@ suite("test_ts_prop_generated_column") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync/prop/row_store/test_ts_prop_row_store.groovy b/regression-test/suites/table_sync/prop/row_store/test_ts_prop_row_store.groovy index 793fa822..d294a041 100644 --- a/regression-test/suites/table_sync/prop/row_store/test_ts_prop_row_store.groovy +++ b/regression-test/suites/table_sync/prop/row_store/test_ts_prop_row_store.groovy @@ -19,6 +19,12 @@ suite("test_ds_prop_row_store") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def test_num = 0 diff --git a/regression-test/suites/table_sync/prop/variant_nested/test_ts_prop_variant_nested.groovy b/regression-test/suites/table_sync/prop/variant_nested/test_ts_prop_variant_nested.groovy index 1e9633d5..4c7cf868 100644 --- a/regression-test/suites/table_sync/prop/variant_nested/test_ts_prop_variant_nested.groovy +++ b/regression-test/suites/table_sync/prop/variant_nested/test_ts_prop_variant_nested.groovy @@ -19,6 +19,12 @@ suite("test_ts_prop_variant_nested") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def test_num = 0 diff --git a/regression-test/suites/table_sync/table/txn_insert/test_ts_tbl_txn_insert.groovy b/regression-test/suites/table_sync/table/txn_insert/test_ts_tbl_txn_insert.groovy index e906925b..bdec7db2 100644 --- a/regression-test/suites/table_sync/table/txn_insert/test_ts_tbl_txn_insert.groovy +++ b/regression-test/suites/table_sync/table/txn_insert/test_ts_tbl_txn_insert.groovy @@ -19,6 +19,12 @@ suite("test_txn_insert_table") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30006, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + if (!helper.has_feature("feature_txn_insert")) { logger.info("Skip the test because the feature is not supported.") return diff --git a/regression-test/suites/table_sync_alias/alt_prop/bucket/test_tsa_alt_prop_bucket.groovy b/regression-test/suites/table_sync_alias/alt_prop/bucket/test_tsa_alt_prop_bucket.groovy index 218a7b19..a983defa 100644 --- a/regression-test/suites/table_sync_alias/alt_prop/bucket/test_tsa_alt_prop_bucket.groovy +++ b/regression-test/suites/table_sync_alias/alt_prop/bucket/test_tsa_alt_prop_bucket.groovy @@ -19,6 +19,12 @@ suite("test_tsa_alt_prop_bucket") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def aliasTableName = "tbl_alias_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync_alias/alt_prop/distribution_num/test_tsa_alt_prop_distr_num.groovy b/regression-test/suites/table_sync_alias/alt_prop/distribution_num/test_tsa_alt_prop_distr_num.groovy index 8bb08d8d..1dc2a12c 100644 --- a/regression-test/suites/table_sync_alias/alt_prop/distribution_num/test_tsa_alt_prop_distr_num.groovy +++ b/regression-test/suites/table_sync_alias/alt_prop/distribution_num/test_tsa_alt_prop_distr_num.groovy @@ -19,6 +19,12 @@ suite("test_tsa_alt_prop_distr_num") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def aliasTableName = "tbl_alias_" + helper.randomSuffix() diff --git a/regression-test/suites/table_sync_alias/alt_prop/distribution_type/test_tsa_alt_prop_distr_type.groovy b/regression-test/suites/table_sync_alias/alt_prop/distribution_type/test_tsa_alt_prop_distr_type.groovy index 2fb50189..00689491 100644 --- a/regression-test/suites/table_sync_alias/alt_prop/distribution_type/test_tsa_alt_prop_distr_type.groovy +++ b/regression-test/suites/table_sync_alias/alt_prop/distribution_type/test_tsa_alt_prop_distr_type.groovy @@ -19,6 +19,12 @@ suite("test_tsa_alt_prop_distr_type") { def helper = new GroovyShell(new Binding(['suite': delegate])) .evaluate(new File("${context.config.suitePath}/../common", "helper.groovy")) + if (!helper.is_version_supported([30099, 20199, 20099])) { + def version = helper.upstream_version() + logger.info("skip this suite because version is not supported, upstream version ${version}") + return + } + def dbName = context.dbName def tableName = "tbl_" + helper.randomSuffix() def aliasTableName = "tbl_alias_" + helper.randomSuffix() diff --git a/test_routes.go b/test_routes.go new file mode 100644 index 00000000..7b1f299f --- /dev/null +++ b/test_routes.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "log" + "net/http" +) + +func main() { + mux := http.NewServeMux() + + // 注册测试路由 + mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{"version":"test"}`) + }) + + mux.HandleFunc("/sync_global", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{"success":true,"message":"sync_global route works"}`) + }) + + fmt.Println("测试服务器启动在 :9191") + fmt.Println("测试命令:") + fmt.Println("curl http://127.0.0.1:9191/version") + fmt.Println("curl -X POST http://127.0.0.1:9191/sync_global") + + log.Fatal(http.ListenAndServe(":9191", mux)) +}