Skip to content

Commit d5058c2

Browse files
feat: add global sync functionality for cross-cluster synchronization
This commit introduces a comprehensive global synchronization feature that enables synchronization of global objects between Doris clusters. Features: - Support for global snapshot creation with configurable options - Flexible sync options: all, backup_privilege, backup_catalog, backup_workload_group - Automatic backup completion monitoring with timeout handling - Global snapshot information retrieval and processing - SQL statement execution on destination cluster - Comprehensive error handling and logging throughout the process API Endpoint: - POST /sync_global - Handles global synchronization requests Request Structure: - name: Unique identifier for the sync operation - src: Source cluster specification - dest: Destination cluster specification - Sync options (at least one required if 'all' is not set): - all: Sync all global objects - backup_privilege: Sync user privileges - backup_catalog: Sync catalog information - backup_workload_group: Sync workload group settings Workflow: 1. Create global snapshot on source cluster 2. Wait for backup completion with polling mechanism 3. Retrieve global snapshot details via RPC 4. Convert snapshot data to SQL statements 5. Execute SQL statements on destination cluster Additional improvements: - Internationalize all Chinese logs and comments to English - Refactor validation logic for flexible option combinations - Decompose syncGlobal function into smaller, focused functions This feature enables seamless migration and synchronization of global configurations between Doris clusters, supporting various deployment scenarios including disaster recovery and cluster migration.
1 parent 02f63b3 commit d5058c2

File tree

3 files changed

+171
-333
lines changed

3 files changed

+171
-333
lines changed

pkg/ccr/base/spec.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func ParseBackupState(state string) BackupState {
8383
}
8484
}
8585

86-
// isSystemDatabase 判断是否为系统数据库,需要跳过
86+
// isSystemDatabase determines if it's a system database that should be skipped
8787
func isSystemDatabase(dbName string) bool {
8888
systemDatabases := []string{
8989
"information_schema",
@@ -501,7 +501,7 @@ func (s *Spec) GetAllDatabases() ([]string, error) {
501501
if err := rows.Scan(&database); err != nil {
502502
return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed")
503503
}
504-
// 过滤系统数据库
504+
// Filter system databases
505505
if !isSystemDatabase(database) {
506506
databases = append(databases, database)
507507
}
@@ -823,20 +823,20 @@ func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
823823
return nil
824824
}
825825

826-
// CreateGlobalSnapshot 创建全局快照,用于同步全局对象
827-
// 根据不同的选项设置不同的属性
826+
// CreateGlobalSnapshot creates a global snapshot for synchronizing global objects
827+
// Sets different properties based on different options
828828
func (s *Spec) CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error {
829-
log.Infof("创建全局快照 %s", snapshotName)
829+
log.Infof("Creating global snapshot %s", snapshotName)
830830

831831
db, err := s.Connect()
832832
if err != nil {
833833
return err
834834
}
835835

836-
// 构建SQL语句
836+
// Build SQL statement
837837
sql := fmt.Sprintf("BACKUP GLOBAL SNAPSHOT %s TO `__keep_on_local__`", utils.FormatKeywordName(snapshotName))
838838

839-
// 根据不同的选项添加属性
839+
// Add properties based on different options
840840
properties := make([]string, 0)
841841
if backupPrivilege {
842842
properties = append(properties, "\"backup_privilege\" = \"true\"")
@@ -848,15 +848,15 @@ func (s *Spec) CreateGlobalSnapshot(snapshotName string, backupPrivilege, backup
848848
properties = append(properties, "\"backup_workload_group\" = \"true\"")
849849
}
850850

851-
// 如果有属性,添加PROPERTIES子句
851+
// If there are properties, add PROPERTIES clause
852852
if len(properties) > 0 {
853853
sql += fmt.Sprintf(" PROPERTIES (%s)", strings.Join(properties, ", "))
854854
}
855855

856-
log.Infof("创建全局快照SQL: %s", sql)
856+
log.Infof("Creating global snapshot SQL: %s", sql)
857857
_, err = db.Exec(sql)
858858
if err != nil {
859-
return xerror.Wrapf(err, xerror.Normal, "创建全局快照 %s 失败, SQL: %s", snapshotName, sql)
859+
return xerror.Wrapf(err, xerror.Normal, "Failed to create global snapshot %s, SQL: %s", snapshotName, sql)
860860
}
861861

862862
return nil
@@ -936,20 +936,20 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er
936936
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
937937
}
938938

939-
// checkGlobalBackupFinished 检查全局备份状态
939+
// checkGlobalBackupFinished checks global backup status
940940
func (s *Spec) checkGlobalBackupFinished(snapshotName string) (BackupState, string, error) {
941-
log.Tracef("检查全局备份状态 %s", snapshotName)
941+
log.Tracef("Checking global backup status %s", snapshotName)
942942

943943
db, err := s.Connect()
944944
if err != nil {
945945
return BackupStateUnknown, "", err
946946
}
947947

948948
sql := fmt.Sprintf("SHOW GLOBAL BACKUP WHERE SnapshotName = \"%s\"", snapshotName)
949-
log.Infof("检查全局备份状态SQL: %s", sql)
949+
log.Infof("Checking global backup status SQL: %s", sql)
950950
rows, err := db.Query(sql)
951951
if err != nil {
952-
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "查询全局备份状态失败, SQL: %s", sql)
952+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Failed to query global backup status, SQL: %s", sql)
953953
}
954954
defer rows.Close()
955955

@@ -961,57 +961,57 @@ func (s *Spec) checkGlobalBackupFinished(snapshotName string) (BackupState, stri
961961

962962
stateStr, err := rowParser.GetString("State")
963963
if err != nil {
964-
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "解析全局备份State失败")
964+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup State")
965965
}
966966

967967
status, err := rowParser.GetString("Status")
968968
if err != nil {
969-
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "解析全局备份Status失败")
969+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup Status")
970970
}
971971

972-
log.Infof("检查全局快照 %s 备份状态: [%v]", snapshotName, stateStr)
972+
log.Infof("Checking global snapshot %s backup status: [%v]", snapshotName, stateStr)
973973
return ParseBackupState(stateStr), status, nil
974974
}
975975

976976
if err := rows.Err(); err != nil {
977-
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "检查全局备份状态, SQL: %s", sql)
977+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Check global backup status, SQL: %s", sql)
978978
}
979979

980-
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "未找到全局备份状态, SQL: %s", sql)
980+
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "Global backup status not found, SQL: %s", sql)
981981
}
982982

983-
// CheckGlobalBackupFinished 检查全局备份是否完成
983+
// CheckGlobalBackupFinished checks if global backup is finished
984984
func (s *Spec) CheckGlobalBackupFinished(snapshotName string) (bool, error) {
985-
log.Tracef("检查全局备份是否完成, spec: %s, snapshot: %s", s.String(), snapshotName)
985+
log.Tracef("Checking if global backup is finished, spec: %s, snapshot: %s", s.String(), snapshotName)
986986

987-
// 重试网络相关错误,避免在目标网络中断、进程重启时进行全量同步
987+
// Retry network related errors to avoid full sync when target network is interrupted or process is restarted
988988
if backupState, status, err := s.checkGlobalBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
989989
return false, err
990990
} else if err == nil && backupState == BackupStateFinished {
991991
return true, nil
992992
} else if err == nil && backupState == BackupStateCancelled {
993-
return false, xerror.Errorf(xerror.Normal, "全局备份失败或被取消, 备份状态: %s", status)
993+
return false, xerror.Errorf(xerror.Normal, "Global backup failed or cancelled, backup status: %s", status)
994994
} else {
995-
// BackupStatePending, BackupStateUnknown 或网络相关错误
995+
// BackupStatePending, BackupStateUnknown or network related errors
996996
if err != nil {
997-
log.Warnf("检查全局备份状态失败, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
997+
log.Warnf("Failed to check global backup status, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
998998
}
999999
return false, nil
10001000
}
10011001
}
10021002

10031003
func (s *Spec) RestoreGlobalInfo(sqls string) error {
1004-
log.Infof("恢复全局信息")
1004+
log.Infof("Restoring global information")
10051005

10061006
db, err := s.Connect()
10071007
if err != nil {
10081008
return err
10091009
}
10101010

1011-
log.Infof("执行全局恢复SQL: %s", sqls)
1011+
log.Infof("Executing global restore SQL: %s", sqls)
10121012
_, err = db.Exec(sqls)
10131013
if err != nil {
1014-
return xerror.Wrapf(err, xerror.Normal, "恢复全局信息失败, SQL: %s", sqls)
1014+
return xerror.Wrapf(err, xerror.Normal, "Failed to restore global information, SQL: %s", sqls)
10151015
}
10161016

10171017
return nil

pkg/rpc/fe.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,6 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string, compress bool)
472472
return convertResult[festruct.TGetSnapshotResult_](result, err)
473473
}
474474

475-
// func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
476-
// // GetGlobalSnapshot 不支持主节点重定向,直接调用主节点客户端
477-
// // 这是因为 TGetGlobalSnapshotResult_ 结构体不包含 MasterAddress 字段
478-
// return rpc.masterClient.GetGlobalSnapshot(spec, labelName)
479-
// }
480-
481475
func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
482476
caller := func(client IFeRpc) (resultType, error) {
483477
return client.GetGlobalSnapshot(spec, labelName)
@@ -816,15 +810,17 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre
816810
}
817811
}
818812

819-
// struct TGetGlobalSnapshotRequest {
820-
// 1: optional string cluster
821-
// 2: optional string user
822-
// 3: optional string passwd
823-
// 4: optional string token
824-
// 5: optional string label_name
825-
// 6: optional string snapshot_name
826-
// 7: optional TSnapshotType snapshot_type
827-
// }
813+
// struct TGetGlobalSnapshotRequest {
814+
// 1: optional string cluster
815+
// 2: optional string user
816+
// 3: optional string passwd
817+
// 4: optional string token
818+
// 5: optional string label_name
819+
// 6: optional string snapshot_name
820+
// 7: optional TSnapshotType snapshot_type
821+
// }
822+
//
823+
// Get Global Snapshot rpc
828824
func (rpc *singleFeClient) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
829825
log.Tracef("Call GetGlobalSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName)
830826

0 commit comments

Comments
 (0)