From 33a6caf682e910b5c0d55cc393087638c1b56607 Mon Sep 17 00:00:00 2001 From: carolinchen Date: Mon, 1 Sep 2025 17:44:46 +0800 Subject: [PATCH] docs: Add cluster CCR usage guide --- CLUSTER_CCR_CHANGES.md | 140 +++++++++++++++ CLUSTER_CCR_USAGE_GUIDE.md | 347 ++++++++++++++++++++++++++++++++++++ pkg/ccr/base/spec.go | 50 ++++++ pkg/ccr/job.go | 7 +- pkg/ccr/specer_mock.go | 15 ++ pkg/service/http_service.go | 335 +++++++++++++++++++++++++++++++++- test_routes.go | 27 +++ 7 files changed, 914 insertions(+), 7 deletions(-) create mode 100644 CLUSTER_CCR_CHANGES.md create mode 100644 CLUSTER_CCR_USAGE_GUIDE.md create mode 100644 test_routes.go diff --git a/CLUSTER_CCR_CHANGES.md b/CLUSTER_CCR_CHANGES.md new file mode 100644 index 00000000..5cc1b947 --- /dev/null +++ b/CLUSTER_CCR_CHANGES.md @@ -0,0 +1,140 @@ +# Cluster CCR Feature Changes + +## 概述 +本次更改为CCR Syncer添加了集群级别同步功能,允许用户一次性同步整个集群的所有数据库,并提供了动态配置监控间隔的功能。 + +## 主要更改 + +### 1. 新增ClusterSync参数 + +#### 文件:`pkg/service/http_service.go` + +**新增字段:** +```go +type CreateCcrRequest struct { + // ... 其他字段 + // Whether it's cluster-level sync, if true, will get all databases from source cluster and create sync task for each database + ClusterSync bool `json:"cluster_sync"` +} +``` + +**新增函数:** +- `createClusterCcr()` - 创建集群级别的CCR同步任务 +- `startDatabaseMonitor()` - 启动数据库监控守护进程 +- `monitorDatabaseChanges()` - 检测数据库变化并处理 +- `handleNewDatabases()` - 处理新增数据库 +- `handleDeletedDatabases()` - 处理删除的数据库 +- `getDatabaseList()` - 获取数据库列表 + +**功能特性:** +- 自动获取源集群所有数据库 +- 为每个数据库创建独立的同步任务 +- 支持动态监控新增/删除的数据库 +- 自动创建/删除对应的同步任务 +- 支持失败重试机制 + +### 2. 动态监控间隔配置 + +#### 新增全局变量: +```go +var ( + databaseMonitorInterval time.Duration = 2 * time.Minute + intervalMutex sync.RWMutex + intervalUpdateChan = make(chan time.Duration, 1) +) +``` + +#### 新增HTTP Handler: + +**`/update_monitor_interval` - 更新监控间隔** +- 请求方法:POST +- 请求格式: +```json +{ + "interval_seconds": 300 +} +``` +- 功能:动态更新数据库监控的检查间隔 + +**`/get_monitor_interval` - 获取当前监控间隔** +- 请求方法:GET +- 响应格式: +```json +{ + "success": true, + "interval_seconds": 120 +} +``` +- 功能:获取当前的监控间隔设置 + +### 3. 性能优化 + +**锁优化:** +- 使用channel通信替代频繁的锁操作 +- 减少了`startDatabaseMonitor`函数中的锁竞争 +- 提高了监控性能 + +**实现细节:** +- 使用`select`语句监听ticker和interval更新 +- 非阻塞的channel通信 +- 只在间隔真正改变时才重置ticker + +## API使用示例 + +### 创建集群级同步任务 +```bash +curl -X POST http://localhost:9190/create_ccr \ + -H "Content-Type: application/json" \ + -d '{ + "name": "cluster_sync_job", + "src": { + "host": "source-cluster", + "port": 9030, + "user": "root", + "password": "password" + }, + "dest": { + "host": "dest-cluster", + "port": 9030, + "user": "root", + "password": "password" + }, + "cluster_sync": true + }' +``` + +### 更新监控间隔 +```bash +curl -X POST http://localhost:9190/update_monitor_interval \ + -H "Content-Type: application/json" \ + -d '{"interval_seconds": 300}' +``` + +### 获取监控间隔 +```bash +curl -X GET http://localhost:9190/get_monitor_interval +``` + +## 技术特点 + +1. **线程安全**:使用mutex保护全局变量 +2. **高性能**:优化锁使用,减少竞争 +3. **动态配置**:支持运行时修改监控间隔 +4. **容错性**:支持失败重试和错误处理 +5. **可观测性**:详细的日志记录 +6. **向后兼容**:不影响现有的单数据库同步功能 + +## 测试建议 + +1. 测试集群级同步功能 +2. 测试动态监控间隔更新 +3. 测试新增/删除数据库的自动处理 +4. 测试并发场景下的性能 +5. 测试错误恢复机制 + +## 注意事项 + +1. 集群级同步会为每个数据库创建独立的同步任务 +2. 任务命名格式:`{原任务名}_{数据库名}` +3. 监控间隔最小值应大于0 +4. 建议在生产环境中谨慎设置监控间隔 \ No newline at end of file diff --git a/CLUSTER_CCR_USAGE_GUIDE.md b/CLUSTER_CCR_USAGE_GUIDE.md new file mode 100644 index 00000000..ea8367d0 --- /dev/null +++ b/CLUSTER_CCR_USAGE_GUIDE.md @@ -0,0 +1,347 @@ +# Cluster CCR 功能使用指南 + +## 概述 + +Cluster CCR 功能为 CCR Syncer 提供了集群级别的数据同步能力,允许用户一次性同步整个集群的所有数据库,并支持动态调整监控间隔。 + +## 主要功能 + +### 1. 集群级别CCR同步 +- 自动发现源集群中的所有数据库 +- 为每个数据库创建独立的同步任务 +- 动态监控数据库的新增和删除 +- 自动创建/删除对应的同步任务 + +### 2. 动态监控间隔配置 +- 运行时调整数据库监控频率 +- 无需重启服务即可生效 +- 线程安全的配置更新 + +## API 接口 + +### 创建集群级同步任务 + +**接口**: `POST /create_ccr` + +**请求参数**: +```json +{ + "name": "cluster_sync_job", + "src": { + "host": "source-cluster-host", + "port": 9030, + "user": "root", + "password": "password", + "database": "" + }, + "dest": { + "host": "dest-cluster-host", + "port": 9030, + "user": "root", + "password": "password", + "database": "" + }, + "skip_error": false, + "allow_table_exists": false, + "reuse_binlog_label": false, + "cluster_sync": true +} +``` + +**参数说明**: +- `cluster_sync`: 设置为 `true` 启用集群级同步 +- `name`: 同步任务的基础名称,实际任务名会加上数据库名后缀 +- `src/dest`: 源集群和目标集群的连接信息 +- 其他参数与单数据库同步相同 + +**响应示例**: +```json +{ + "success": true +} +``` + +### 更新监控间隔 + +**接口**: `POST /update_monitor_interval` + +**请求参数**: +```json +{ + "interval_seconds": 300 +} +``` + +**参数说明**: +- `interval_seconds`: 监控间隔时间(秒),必须大于0 + +**响应示例**: +```json +{ + "success": true +} +``` + +### 获取当前监控间隔 + +**接口**: `GET /get_monitor_interval` + +**响应示例**: +```json +{ + "success": true, + "interval_seconds": 120 +} +``` + +## 使用示例 + +### 1. 创建集群级同步任务 + +```bash +curl -X POST http://localhost:9190/create_ccr \ + -H "Content-Type: application/json" \ + -d '{ + "name": "prod_cluster_sync", + "src": { + "host": "10.1.1.100", + "port": 9030, + "user": "root", + "password": "source_password" + }, + "dest": { + "host": "10.1.2.100", + "port": 9030, + "user": "root", + "password": "dest_password" + }, + "cluster_sync": true, + "skip_error": false, + "allow_table_exists": true + }' +``` + +### 2. 调整监控间隔为5分钟 + +```bash +curl -X POST http://localhost:9190/update_monitor_interval \ + -H "Content-Type: application/json" \ + -d '{"interval_seconds": 300}' +``` + +### 3. 查看当前监控间隔 + +```bash +curl -X GET http://localhost:9190/get_monitor_interval +``` + +### 4. 查看创建的同步任务 + +```bash +curl -X GET http://localhost:9190/list_jobs +``` + +## 工作原理 + +### 集群同步流程 + +1. **初始化阶段**: + - 连接源集群,获取所有数据库列表 + - 为每个数据库创建独立的同步任务 + - 任务命名格式:`{基础名称}_{数据库名}` + +2. **监控阶段**: + - 启动后台守护进程,定期检查数据库变化 + - 默认检查间隔:2分钟(可通过API调整) + - 检测新增数据库并自动创建同步任务 + - 检测删除的数据库并自动删除对应任务 + +3. **同步阶段**: + - 每个数据库的同步任务独立运行 + - 支持失败重试机制 + - 详细的日志记录和错误处理 + +### 任务管理 + +创建集群同步后,会生成多个独立的同步任务: + +``` +原始任务名: prod_cluster_sync +生成的任务: +- prod_cluster_sync_db1 +- prod_cluster_sync_db2 +- prod_cluster_sync_db3 +... +``` + +每个任务可以独立管理: +- 暂停/恢复:`/pause`, `/resume` +- 查看状态:`/job_status` +- 查看进度:`/job_progress` +- 删除任务:`/delete` + +## 监控和运维 + +### 查看集群同步状态 + +```bash +# 查看所有任务 +curl -X GET http://localhost:9190/list_jobs + +# 查看特定任务状态 +curl -X POST http://localhost:9190/job_status \ + -H "Content-Type: application/json" \ + -d '{"name": "prod_cluster_sync_db1"}' + +# 查看任务详情 +curl -X POST http://localhost:9190/job_detail \ + -H "Content-Type: application/json" \ + -d '{"name": "prod_cluster_sync_db1"}' +``` + +### 调整监控频率 + +根据集群规模和变化频率调整监控间隔: + +```bash +# 高频监控(1分钟)- 适用于频繁变化的开发环境 +curl -X POST http://localhost:9190/update_monitor_interval \ + -d '{"interval_seconds": 60}' + +# 中频监控(5分钟)- 适用于一般生产环境 +curl -X POST http://localhost:9190/update_monitor_interval \ + -d '{"interval_seconds": 300}' + +# 低频监控(30分钟)- 适用于稳定的生产环境 +curl -X POST http://localhost:9190/update_monitor_interval \ + -d '{"interval_seconds": 1800}' +``` + +### 日志监控 + +关键日志信息: + +``` +# 集群同步启动 +Starting database monitor daemon, task name prefix: prod_cluster_sync + +# 发现数据库变化 +Found 2 new databases: [new_db1, new_db2] +Successfully created sync task for new database new_db1 + +# 监控间隔调整 +Database monitor interval updated from 2m0s to 5m0s +Database monitor interval changed from 2m0s to 5m0s + +# 数据库删除检测 +Found 1 deleted databases: [old_db] +Successfully removed sync task for deleted database old_db +``` + +## 最佳实践 + +### 1. 监控间隔设置 + +- **开发环境**: 1-2分钟,快速响应数据库变化 +- **测试环境**: 2-5分钟,平衡响应速度和资源消耗 +- **生产环境**: 5-30分钟,根据数据库变化频率调整 + +### 2. 任务命名 + +使用有意义的任务名称,便于管理: +```json +{ + "name": "prod_to_backup_cluster", + "cluster_sync": true +} +``` + +### 3. 错误处理 + +启用错误跳过,避免单个数据库问题影响整体同步: +```json +{ + "skip_error": true, + "cluster_sync": true +} +``` + +### 4. 表存在处理 + +对于可能存在表冲突的场景: +```json +{ + "allow_table_exists": true, + "cluster_sync": true +} +``` + +## 故障排查 + +### 常见问题 + +1. **权限不足** + ``` + 错误: Failed to get database list from source cluster + 解决: 确保用户有SHOW DATABASES权限 + ``` + +2. **连接失败** + ``` + 错误: Failed to connect to source cluster + 解决: 检查网络连接和防火墙设置 + ``` + +3. **任务创建失败** + ``` + 错误: Failed to create sync task for database xxx + 解决: 检查目标集群连接和权限 + ``` + +### 调试命令 + +```bash +# 查看所有任务状态 +curl -X GET http://localhost:9190/view?type=table + +# 查看特定任务的详细信息 +curl -X POST http://localhost:9190/job_progress \ + -d '{"name": "cluster_sync_db1"}' + +# 查看当前监控间隔 +curl -X GET http://localhost:9190/get_monitor_interval +``` + +## 性能优化 + +### 1. 锁优化 +- 使用channel通信替代频繁锁操作 +- 减少监控循环中的锁竞争 +- 提升整体监控性能 + +### 2. 资源管理 +- 合理设置监控间隔,避免过度消耗资源 +- 监控任务数量,避免创建过多并发任务 +- 定期清理已删除数据库的相关资源 + +### 3. 网络优化 +- 使用连接池减少连接开销 +- 合理设置超时时间 +- 考虑网络延迟对监控间隔的影响 + +## 版本兼容性 + +- **向后兼容**: 现有单数据库同步功能完全保持不变 +- **默认行为**: `cluster_sync` 参数默认为 `false` +- **API兼容**: 所有现有API接口保持兼容 + +## 安全考虑 + +1. **权限控制**: 确保同步用户只有必要的数据库权限 +2. **网络安全**: 使用安全的网络连接,考虑VPN或专线 +3. **密码管理**: 避免在日志中记录敏感信息 +4. **访问控制**: 限制监控间隔配置API的访问权限 + +--- + +更多详细信息请参考 `CLUSTER_CCR_CHANGES.md` 技术文档。 \ No newline at end of file diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 25e42942..14a3e22a 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -83,6 +83,22 @@ func ParseBackupState(state string) BackupState { } } +// isSystemDatabase 判断是否为系统数据库,需要跳过 +func isSystemDatabase(dbName string) bool { + systemDatabases := []string{ + "information_schema", + "mysql", + "__internal_schema", + } + + for _, sysDb := range systemDatabases { + if dbName == sysDb { + return true + } + } + return false +} + type RestoreState int const ( @@ -464,6 +480,40 @@ func (s *Spec) GetAllTables() ([]string, error) { return tables, nil } +func (s *Spec) GetAllDatabases() ([]string, error) { + log.Tracef("get all databases from cluster") + + db, err := s.Connect() + if err != nil { + return nil, err + } + + sql := "SHOW DATABASES" + rows, err := db.Query(sql) + if err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "query %s failed", sql) + } + defer rows.Close() + + var databases []string + for rows.Next() { + var database string + if err := rows.Scan(&database); err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed") + } + // 过滤系统数据库 + if !isSystemDatabase(database) { + databases = append(databases, database) + } + } + + if err := rows.Err(); err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "rows error") + } + + return databases, nil +} + func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) { db, err := s.Connect() if err != nil { diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index cdf8d360..68f6a708 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -130,8 +130,9 @@ func init() { type SyncType int const ( - DBSync SyncType = 0 - TableSync SyncType = 1 + DBSync SyncType = 0 + TableSync SyncType = 1 + ClusterSync SyncType = 2 ) func (s SyncType) String() string { @@ -140,6 +141,8 @@ func (s SyncType) String() string { return "db_sync" case TableSync: return "table_sync" + case ClusterSync: + return "cluster_sync" default: return "unknown_sync" } diff --git a/pkg/ccr/specer_mock.go b/pkg/ccr/specer_mock.go index cbf278b7..4b6b51fa 100644 --- a/pkg/ccr/specer_mock.go +++ b/pkg/ccr/specer_mock.go @@ -216,6 +216,21 @@ func (mr *MockSpecerMockRecorder) Exec(sql any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exec", reflect.TypeOf((*MockSpecer)(nil).Exec), sql) } +// GetAllTables mocks base method. +func (m *MockSpecer) GetAllDatabases() ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllDatabases") + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllDatabases indicates an expected call of GetAllDatabases. +func (mr *MockSpecerMockRecorder) GetAllDatabases() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllDatabases", reflect.TypeOf((*MockSpecer)(nil).GetAllDatabases)) +} + // GetAllTables mocks base method. func (m *MockSpecer) GetAllTables() ([]string, error) { m.ctrl.T.Helper() diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index 4804264a..7501ae5b 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -25,6 +25,7 @@ import ( "reflect" "strconv" "strings" + "sync" "time" "github.com/selectdb/ccr_syncer/pkg/ccr" @@ -40,6 +41,13 @@ import ( log "github.com/sirupsen/logrus" ) +// Global variable to store database monitor check interval +var ( + databaseMonitorInterval time.Duration = 2 * time.Minute + intervalMutex sync.RWMutex + intervalUpdateChan = make(chan time.Duration, 1) +) + // TODO(Drogon): impl a generic http request handle parse json func writeJson(w http.ResponseWriter, data interface{}) { @@ -103,6 +111,8 @@ type CreateCcrRequest struct { // For table sync, allow to create ccr job even if the target table already exists. AllowTableExists bool `json:"allow_table_exists"` ReuseBinlogLabel bool `json:"reuse_binlog_label"` + // Whether it's cluster-level sync, if true, will get all databases from source cluster and create sync task for each database + ClusterSync bool `json:"cluster_sync"` } // Stringer @@ -156,6 +166,250 @@ func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobMana return nil } +// createClusterCcr creates cluster-level CCR synchronization tasks +// Gets all databases from the source cluster and creates a sync task for each database +func createClusterCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error { + log.Infof("create cluster ccr %s", request) + + // Get all database list from source cluster + databases, err := getDatabaseList(&request.Src) + if err != nil { + return xerror.Wrapf(err, xerror.Normal, "Failed to get database list from source cluster") + } + + if len(databases) == 0 { + return xerror.Errorf(xerror.Normal, "No databases found in source cluster") + } + + log.Infof("Found %d databases, starting to create cluster-level sync tasks: %v", len(databases), databases) + + var errors []string + successCount := 0 + + for _, dbName := range databases { + // Create a new request for each database + dbRequest := &CreateCcrRequest{ + Name: fmt.Sprintf("%s_%s", request.Name, dbName), // Task name with database name appended + Src: request.Src, + Dest: request.Dest, + SkipError: request.SkipError, + AllowTableExists: request.AllowTableExists, + ReuseBinlogLabel: request.ReuseBinlogLabel, + ClusterSync: false, // Set to false to avoid recursive calls + } + + dbRequest.Src.Database = dbName + dbRequest.Dest.Database = dbName + + if err := createCcr(dbRequest, db, jobManager); err != nil { + errMsg := fmt.Sprintf("Failed to create sync task for database %s: %v", dbName, err) + log.Warnf(errMsg) + errors = append(errors, errMsg) + } else { + successCount++ + log.Infof("Successfully created sync task for database %s", dbName) + } + } + + if len(errors) > 0 { + if successCount == 0 { + return xerror.Errorf(xerror.Normal, "All database sync tasks creation failed: %s", strings.Join(errors, "; ")) + } else { + log.Warnf("Partial cluster sync tasks creation failed, success: %d, failed: %d, errors: %s", + successCount, len(errors), strings.Join(errors, "; ")) + } + } + + log.Infof("Cluster-level sync tasks creation completed, success: %d, failed: %d", successCount, len(errors)) + + // Start daemon task to periodically detect new databases in source cluster, passing existing database list + go startDatabaseMonitor(request, db, jobManager, databases) + + return nil +} + +// startDatabaseMonitor starts a daemon task to periodically detect new and deleted databases in source cluster, and create or delete corresponding sync tasks +func startDatabaseMonitor(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager, initialDatabases []string) { + log.Infof("Starting database monitor daemon, task name prefix: %s", request.Name) + + existingDatabases := initializeDatabaseTracking(initialDatabases) + log.Infof("Initialized database monitoring, currently have %d databases", len(existingDatabases)) + + intervalMutex.RLock() + currentInterval := databaseMonitorInterval + intervalMutex.RUnlock() + + ticker := time.NewTicker(currentInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + monitorDatabaseChanges(request, db, jobManager, existingDatabases) + case newInterval := <-intervalUpdateChan: + if newInterval != currentInterval { + log.Infof("Database monitor interval changed from %v to %v", currentInterval, newInterval) + ticker.Stop() + ticker = time.NewTicker(newInterval) + currentInterval = newInterval + } + } + } +} + +func initializeDatabaseTracking(initialDatabases []string) map[string]bool { + existingDatabases := make(map[string]bool) + for _, dbName := range initialDatabases { + existingDatabases[dbName] = true + } + return existingDatabases +} + +// monitorDatabaseChanges detects database changes and handles them +func monitorDatabaseChanges(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager, existingDatabases map[string]bool) { + currentDatabases, err := request.Src.GetAllDatabases() + if err != nil { + log.Errorf("Failed to get database list: %v", err) + return + } + + currentDatabaseMap := make(map[string]bool) + for _, dbName := range currentDatabases { + if dbName == "" { + continue + } + currentDatabaseMap[dbName] = true + } + + newDatabases := identifyNewDatabases(currentDatabases, existingDatabases) + deletedDatabases := identifyDeletedDatabases(existingDatabases, currentDatabaseMap) + + handleNewDatabases(newDatabases, request, db, jobManager) + handleDeletedDatabases(deletedDatabases, request, jobManager) +} + +func identifyNewDatabases(currentDatabases []string, existingDatabases map[string]bool) []string { + var newDatabases []string + for _, dbName := range currentDatabases { + if !existingDatabases[dbName] { + newDatabases = append(newDatabases, dbName) + existingDatabases[dbName] = true + } + } + return newDatabases +} + +func identifyDeletedDatabases(existingDatabases map[string]bool, currentDatabaseMap map[string]bool) []string { + var deletedDatabases []string + for dbName := range existingDatabases { + if !currentDatabaseMap[dbName] { + deletedDatabases = append(deletedDatabases, dbName) + delete(existingDatabases, dbName) + } + } + return deletedDatabases +} + +func handleNewDatabases(newDatabases []string, request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) { + if len(newDatabases) == 0 { + return + } + + log.Infof("Found %d new databases: %v", len(newDatabases), newDatabases) + + for _, dbName := range newDatabases { + if dbName == "" { + log.Warnf("Skipping empty database name") + continue + } + + jobName := fmt.Sprintf("%s_%s", request.Name, dbName) + jobExists, err := db.IsJobExist(jobName) + if err != nil { + log.Warnf("Error checking if job %s exists: %v", jobName, err) + continue + } + + if jobExists { + log.Warnf("Job %s already exists, skipping sync task creation for database %s", jobName, dbName) + continue + } + + dbRequest := &CreateCcrRequest{ + Name: jobName, + Src: request.Src, + Dest: request.Dest, + SkipError: request.SkipError, + AllowTableExists: request.AllowTableExists, + ReuseBinlogLabel: request.ReuseBinlogLabel, + ClusterSync: false, // Set to false to avoid recursive calls + } + + dbRequest.Src.Database = dbName + dbRequest.Dest.Database = dbName + + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + if err := createCcr(dbRequest, db, jobManager); err != nil { + if i == maxRetries-1 { + log.Warnf("Failed to create sync task for new database %s (attempt %d/%d): %v", dbName, i+1, maxRetries, err) + } else { + log.Warnf("Failed to create sync task for new database %s (attempt %d/%d): %v, will retry", dbName, i+1, maxRetries, err) + time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff + } + } else { + log.Infof("Successfully created sync task for new database %s", dbName) + break + } + } + } +} + +func handleDeletedDatabases(deletedDatabases []string, request *CreateCcrRequest, jobManager *ccr.JobManager) { + if len(deletedDatabases) == 0 { + return + } + + log.Infof("Found %d deleted databases: %v", len(deletedDatabases), deletedDatabases) + + for _, dbName := range deletedDatabases { + if dbName == "" { + log.Warnf("Skipping empty database name") + continue + } + + jobName := fmt.Sprintf("%s_%s", request.Name, dbName) + + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + if err := jobManager.RemoveJob(jobName); err != nil { + if i == maxRetries-1 { + log.Warnf("Failed to remove sync task for deleted database %s (attempt %d/%d): %v", dbName, i+1, maxRetries, err) + } else { + log.Warnf("Failed to remove sync task for deleted database %s (attempt %d/%d): %v, will retry", dbName, i+1, maxRetries, err) + time.Sleep(time.Second * time.Duration(i+1)) // Exponential backoff + } + } else { + log.Infof("Successfully removed sync task for deleted database %s", dbName) + break + } + } + } +} + +func getDatabaseList(spec *base.Spec) ([]string, error) { + log.Infof("Getting database list for cluster %s", spec.Host) + + // Use Specer interface's GetAllDatabases method + databases, err := spec.GetAllDatabases() + if err != nil { + return nil, xerror.Wrapf(err, xerror.Normal, "Failed to get database list") + } + + log.Infof("Got %d user databases: %v", len(databases), databases) + return databases, nil +} + // return exit(bool) func (s *HttpService) redirect(jobName string, w http.ResponseWriter, r *http.Request) bool { if jobExist, err := s.db.IsJobExist(jobName); err != nil { @@ -205,12 +459,20 @@ func (s *HttpService) createHandler(w http.ResponseWriter, r *http.Request) { return } - // Call the createCcr function to create the CCR - if err = createCcr(&request, s.db, s.jobManager); err != nil { - log.Warnf("create ccr failed: %+v", err) - createResult = newErrorResult(err.Error()) + if request.ClusterSync { + if err = createClusterCcr(&request, s.db, s.jobManager); err != nil { + log.Warnf("create cluster ccr failed: %+v", err) + createResult = newErrorResult(err.Error()) + } else { + createResult = newSuccessResult() + } } else { - createResult = newSuccessResult() + if err = createCcr(&request, s.db, s.jobManager); err != nil { + log.Warnf("create ccr failed: %+v", err) + createResult = newErrorResult(err.Error()) + } else { + createResult = newSuccessResult() + } } } @@ -1112,6 +1374,67 @@ func (s *HttpService) failpointHandler(w http.ResponseWriter, r *http.Request) { result = newSuccessResult() } +func (s *HttpService) updateMonitorIntervalHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update database monitor interval") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request struct { + IntervalSeconds int `json:"interval_seconds"` + } + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update monitor interval failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.IntervalSeconds <= 0 { + log.Warnf("update monitor interval failed: interval_seconds must be positive") + result = newErrorResult("interval_seconds must be positive") + return + } + + newInterval := time.Duration(request.IntervalSeconds) * time.Second + + intervalMutex.Lock() + oldInterval := databaseMonitorInterval + databaseMonitorInterval = newInterval + intervalMutex.Unlock() + + // Send update notification through channel (non-blocking) + select { + case intervalUpdateChan <- newInterval: + default: + // Channel is full, but that's okay since we only need the latest value + } + + log.Infof("Database monitor interval updated from %v to %v", oldInterval, newInterval) + result = newSuccessResult() +} + +func (s *HttpService) getMonitorIntervalHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("get database monitor interval") + + type result struct { + *defaultResult + IntervalSeconds int `json:"interval_seconds"` + } + + intervalMutex.RLock() + currentInterval := databaseMonitorInterval + intervalMutex.RUnlock() + + intervalResult := &result{ + defaultResult: newSuccessResult(), + IntervalSeconds: int(currentInterval.Seconds()), + } + + writeJson(w, intervalResult) +} + func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/version", s.versionHandler) s.mux.HandleFunc("/create_ccr", s.createHandler) @@ -1129,6 +1452,8 @@ func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/update_host_mapping", s.updateHostMappingHandler) s.mux.HandleFunc("/job_skip_binlog", s.skipBinlogHandler) s.mux.HandleFunc("/failpoint", s.failpointHandler) + s.mux.HandleFunc("/update_monitor_interval", s.updateMonitorIntervalHandler) + s.mux.HandleFunc("/get_monitor_interval", s.getMonitorIntervalHandler) s.mux.Handle("/metrics", xmetrics.GetHttpHandler()) s.mux.HandleFunc("/sync", s.syncHandler) s.mux.HandleFunc("/view", s.showJobStateHandler) diff --git a/test_routes.go b/test_routes.go new file mode 100644 index 00000000..7b1f299f --- /dev/null +++ b/test_routes.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "log" + "net/http" +) + +func main() { + mux := http.NewServeMux() + + // 注册测试路由 + mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{"version":"test"}`) + }) + + mux.HandleFunc("/sync_global", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `{"success":true,"message":"sync_global route works"}`) + }) + + fmt.Println("测试服务器启动在 :9191") + fmt.Println("测试命令:") + fmt.Println("curl http://127.0.0.1:9191/version") + fmt.Println("curl -X POST http://127.0.0.1:9191/sync_global") + + log.Fatal(http.ListenAndServe(":9191", mux)) +}