Skip to content

Commit 3b19b71

Browse files
feat: add global sync functionality for cross-cluster synchronization
This commit introduces a comprehensive global synchronization feature that enables synchronization of global objects between Doris clusters. Features: - Support for global snapshot creation with configurable options - Flexible sync options: all, backup_privilege, backup_catalog, backup_workload_group - Automatic backup completion monitoring with timeout handling - Global snapshot information retrieval and processing - SQL statement execution on destination cluster - Comprehensive error handling and logging throughout the process API Endpoint: - POST /sync_global - Handles global synchronization requests Request Structure: - name: Unique identifier for the sync operation - src: Source cluster specification - dest: Destination cluster specification - Sync options (at least one required if 'all' is not set): - all: Sync all global objects - backup_privilege: Sync user privileges - backup_catalog: Sync catalog information - backup_workload_group: Sync workload group settings Workflow: 1. Create global snapshot on source cluster 2. Wait for backup completion with polling mechanism 3. Retrieve global snapshot details via RPC 4. Convert snapshot data to SQL statements 5. Execute SQL statements on destination cluster Additional improvements: - Internationalize all Chinese logs and comments to English - Refactor validation logic for flexible option combinations - Decompose syncGlobal function into smaller, focused functions This feature enables seamless migration and synchronization of global configurations between Doris clusters, supporting various deployment scenarios including disaster recovery and cluster migration.
1 parent 9926863 commit 3b19b71

File tree

10 files changed

+3771
-823
lines changed

10 files changed

+3771
-823
lines changed

pkg/ccr/base/spec.go

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func ParseBackupState(state string) BackupState {
8383
}
8484
}
8585

86-
// isSystemDatabase 判断是否为系统数据库,需要跳过
86+
// isSystemDatabase determines if it's a system database that should be skipped
8787
func isSystemDatabase(dbName string) bool {
8888
systemDatabases := []string{
8989
"information_schema",
@@ -501,7 +501,7 @@ func (s *Spec) GetAllDatabases() ([]string, error) {
501501
if err := rows.Scan(&database); err != nil {
502502
return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed")
503503
}
504-
// 过滤系统数据库
504+
// Filter system databases
505505
if !isSystemDatabase(database) {
506506
databases = append(databases, database)
507507
}
@@ -823,6 +823,45 @@ func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
823823
return nil
824824
}
825825

826+
// CreateGlobalSnapshot creates a global snapshot for synchronizing global objects
827+
// Sets different properties based on different options
828+
func (s *Spec) CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error {
829+
log.Infof("Creating global snapshot %s", snapshotName)
830+
831+
db, err := s.Connect()
832+
if err != nil {
833+
return err
834+
}
835+
836+
// Build SQL statement
837+
sql := fmt.Sprintf("BACKUP GLOBAL SNAPSHOT %s TO `__keep_on_local__`", utils.FormatKeywordName(snapshotName))
838+
839+
// Add properties based on different options
840+
properties := make([]string, 0)
841+
if backupPrivilege {
842+
properties = append(properties, "\"backup_privilege\" = \"true\"")
843+
}
844+
if backupCatalog {
845+
properties = append(properties, "\"backup_catalog\" = \"true\"")
846+
}
847+
if backupWorkloadGroup {
848+
properties = append(properties, "\"backup_workload_group\" = \"true\"")
849+
}
850+
851+
// If there are properties, add PROPERTIES clause
852+
if len(properties) > 0 {
853+
sql += fmt.Sprintf(" PROPERTIES (%s)", strings.Join(properties, ", "))
854+
}
855+
856+
log.Infof("Creating global snapshot SQL: %s", sql)
857+
_, err = db.Exec(sql)
858+
if err != nil {
859+
return xerror.Wrapf(err, xerror.Normal, "Failed to create global snapshot %s, SQL: %s", snapshotName, sql)
860+
}
861+
862+
return nil
863+
}
864+
826865
// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
827866
func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []string) error {
828867
if len(table) == 0 {
@@ -897,6 +936,87 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er
897936
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
898937
}
899938

939+
// checkGlobalBackupFinished checks global backup status
940+
func (s *Spec) checkGlobalBackupFinished(snapshotName string) (BackupState, string, error) {
941+
log.Tracef("Checking global backup status %s", snapshotName)
942+
943+
db, err := s.Connect()
944+
if err != nil {
945+
return BackupStateUnknown, "", err
946+
}
947+
948+
sql := fmt.Sprintf("SHOW GLOBAL BACKUP WHERE SnapshotName = \"%s\"", snapshotName)
949+
log.Infof("Checking global backup status SQL: %s", sql)
950+
rows, err := db.Query(sql)
951+
if err != nil {
952+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Failed to query global backup status, SQL: %s", sql)
953+
}
954+
defer rows.Close()
955+
956+
if rows.Next() {
957+
rowParser := utils.NewRowParser()
958+
if err := rowParser.Parse(rows); err != nil {
959+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql)
960+
}
961+
962+
stateStr, err := rowParser.GetString("State")
963+
if err != nil {
964+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup State")
965+
}
966+
967+
status, err := rowParser.GetString("Status")
968+
if err != nil {
969+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup Status")
970+
}
971+
972+
log.Infof("Checking global snapshot %s backup status: [%v]", snapshotName, stateStr)
973+
return ParseBackupState(stateStr), status, nil
974+
}
975+
976+
if err := rows.Err(); err != nil {
977+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Check global backup status, SQL: %s", sql)
978+
}
979+
980+
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "Global backup status not found, SQL: %s", sql)
981+
}
982+
983+
// CheckGlobalBackupFinished checks if global backup is finished
984+
func (s *Spec) CheckGlobalBackupFinished(snapshotName string) (bool, error) {
985+
log.Tracef("Checking if global backup is finished, spec: %s, snapshot: %s", s.String(), snapshotName)
986+
987+
// Retry network related errors to avoid full sync when target network is interrupted or process is restarted
988+
if backupState, status, err := s.checkGlobalBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
989+
return false, err
990+
} else if err == nil && backupState == BackupStateFinished {
991+
return true, nil
992+
} else if err == nil && backupState == BackupStateCancelled {
993+
return false, xerror.Errorf(xerror.Normal, "Global backup failed or cancelled, backup status: %s", status)
994+
} else {
995+
// BackupStatePending, BackupStateUnknown or network related errors
996+
if err != nil {
997+
log.Warnf("Failed to check global backup status, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
998+
}
999+
return false, nil
1000+
}
1001+
}
1002+
1003+
func (s *Spec) RestoreGlobalInfo(sqls string) error {
1004+
log.Infof("Restoring global information")
1005+
1006+
db, err := s.Connect()
1007+
if err != nil {
1008+
return err
1009+
}
1010+
1011+
log.Infof("Executing global restore SQL: %s", sqls)
1012+
_, err = db.Exec(sqls)
1013+
if err != nil {
1014+
return xerror.Wrapf(err, xerror.Normal, "Failed to restore global information, SQL: %s", sqls)
1015+
}
1016+
1017+
return nil
1018+
}
1019+
9001020
func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
9011021
log.Tracef("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)
9021022

pkg/ccr/base/specer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ type Specer interface {
4747
CancelRestoreIfExists(snapshotName string) error
4848
CreatePartialSnapshot(snapshotName, table string, partitions []string) error
4949
CreateSnapshot(snapshotName string, tables []string) error
50+
CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error
51+
RestoreGlobalInfo(sqls string) error
5052
CheckBackupFinished(snapshotName string) (bool, error)
53+
CheckGlobalBackupFinished(snapshotName string) (bool, error)
5154
CheckRestoreFinished(snapshotName string) (bool, error)
5255
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
5356
WaitTransactionDone(txnId int64) // busy wait

pkg/ccr/utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func NewBackupJobInfoFromJson(data []byte) (*BackupJobInfo, error) {
5555
return jobInfo, nil
5656
}
5757

58+
func NewBackupSqlsFromBytes(backupSqls []byte) (string, error) {
59+
return string(backupSqls), nil
60+
}
61+
5862
func (i *BackupJobInfo) TableNameMapping() map[int64]string {
5963
tableMapping := make(map[int64]string)
6064
for tableName, tableInfo := range i.BackupObjects {

pkg/rpc/fe.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type IFeRpc interface {
108108
GetBinlog(*base.Spec, int64, int64) (*festruct.TGetBinlogResult_, error)
109109
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
110110
GetSnapshot(*base.Spec, string, bool) (*festruct.TGetSnapshotResult_, error)
111+
GetGlobalSnapshot(*base.Spec, string) (*festruct.TGetGlobalSnapshotResult_, error)
111112
RestoreSnapshot(*base.Spec, *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error)
112113
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
113114
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
@@ -471,6 +472,14 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string, compress bool)
471472
return convertResult[festruct.TGetSnapshotResult_](result, err)
472473
}
473474

475+
func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
476+
caller := func(client IFeRpc) (resultType, error) {
477+
return client.GetGlobalSnapshot(spec, labelName)
478+
}
479+
result, err := rpc.callWithMasterRedirect(caller)
480+
return convertResult[festruct.TGetGlobalSnapshotResult_](result, err)
481+
}
482+
474483
func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, req *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) {
475484
caller := func(client IFeRpc) (resultType, error) {
476485
return client.RestoreSnapshot(spec, req)
@@ -801,6 +810,41 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre
801810
}
802811
}
803812

813+
// struct TGetGlobalSnapshotRequest {
814+
// 1: optional string cluster
815+
// 2: optional string user
816+
// 3: optional string passwd
817+
// 4: optional string token
818+
// 5: optional string label_name
819+
// 6: optional string snapshot_name
820+
// 7: optional TSnapshotType snapshot_type
821+
// }
822+
//
823+
// Get Global Snapshot rpc
824+
func (rpc *singleFeClient) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
825+
log.Tracef("Call GetGlobalSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName)
826+
827+
defer xmetrics.RecordFeRpc("GetGlobalSnapshot", rpc.addr)()
828+
829+
client := rpc.client
830+
snapshotType := festruct.TSnapshotType_LOCAL
831+
req := &festruct.TGetGlobalSnapshotRequest{
832+
Cluster: &spec.Cluster,
833+
User: &spec.User,
834+
Passwd: &spec.Password,
835+
SnapshotName: &labelName,
836+
SnapshotType: &snapshotType,
837+
}
838+
839+
log.Tracef("GetGlobalSnapshotRequest user %s, cluster %s, snapshot name %s, snapshot type %d",
840+
req.GetUser(), req.GetCluster(), req.GetSnapshotName(), req.GetSnapshotType())
841+
if resp, err := client.GetGlobalSnapshot(context.Background(), req); err != nil {
842+
return nil, xerror.Wrapf(err, xerror.RPC, "GetGlobalSnapshot error: %v, req: %+v", err, req)
843+
} else {
844+
return resp, nil
845+
}
846+
}
847+
804848
// struct TRestoreSnapshotRequest {
805849
// 1: optional string cluster
806850
// 2: optional string user

0 commit comments

Comments
 (0)