Skip to content

Commit 02f63b3

Browse files
step1: privilege
1 parent 9926863 commit 02f63b3

File tree

29 files changed

+9962
-11893
lines changed

29 files changed

+9962
-11893
lines changed

pkg/ccr/base/spec.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,45 @@ func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
823823
return nil
824824
}
825825

826+
// CreateGlobalSnapshot 创建全局快照,用于同步全局对象
827+
// 根据不同的选项设置不同的属性
828+
func (s *Spec) CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error {
829+
log.Infof("创建全局快照 %s", snapshotName)
830+
831+
db, err := s.Connect()
832+
if err != nil {
833+
return err
834+
}
835+
836+
// 构建SQL语句
837+
sql := fmt.Sprintf("BACKUP GLOBAL SNAPSHOT %s TO `__keep_on_local__`", utils.FormatKeywordName(snapshotName))
838+
839+
// 根据不同的选项添加属性
840+
properties := make([]string, 0)
841+
if backupPrivilege {
842+
properties = append(properties, "\"backup_privilege\" = \"true\"")
843+
}
844+
if backupCatalog {
845+
properties = append(properties, "\"backup_catalog\" = \"true\"")
846+
}
847+
if backupWorkloadGroup {
848+
properties = append(properties, "\"backup_workload_group\" = \"true\"")
849+
}
850+
851+
// 如果有属性,添加PROPERTIES子句
852+
if len(properties) > 0 {
853+
sql += fmt.Sprintf(" PROPERTIES (%s)", strings.Join(properties, ", "))
854+
}
855+
856+
log.Infof("创建全局快照SQL: %s", sql)
857+
_, err = db.Exec(sql)
858+
if err != nil {
859+
return xerror.Wrapf(err, xerror.Normal, "创建全局快照 %s 失败, SQL: %s", snapshotName, sql)
860+
}
861+
862+
return nil
863+
}
864+
826865
// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
827866
func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []string) error {
828867
if len(table) == 0 {
@@ -897,6 +936,87 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er
897936
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
898937
}
899938

939+
// checkGlobalBackupFinished 检查全局备份状态
940+
func (s *Spec) checkGlobalBackupFinished(snapshotName string) (BackupState, string, error) {
941+
log.Tracef("检查全局备份状态 %s", snapshotName)
942+
943+
db, err := s.Connect()
944+
if err != nil {
945+
return BackupStateUnknown, "", err
946+
}
947+
948+
sql := fmt.Sprintf("SHOW GLOBAL BACKUP WHERE SnapshotName = \"%s\"", snapshotName)
949+
log.Infof("检查全局备份状态SQL: %s", sql)
950+
rows, err := db.Query(sql)
951+
if err != nil {
952+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "查询全局备份状态失败, SQL: %s", sql)
953+
}
954+
defer rows.Close()
955+
956+
if rows.Next() {
957+
rowParser := utils.NewRowParser()
958+
if err := rowParser.Parse(rows); err != nil {
959+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql)
960+
}
961+
962+
stateStr, err := rowParser.GetString("State")
963+
if err != nil {
964+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "解析全局备份State失败")
965+
}
966+
967+
status, err := rowParser.GetString("Status")
968+
if err != nil {
969+
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "解析全局备份Status失败")
970+
}
971+
972+
log.Infof("检查全局快照 %s 备份状态: [%v]", snapshotName, stateStr)
973+
return ParseBackupState(stateStr), status, nil
974+
}
975+
976+
if err := rows.Err(); err != nil {
977+
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "检查全局备份状态, SQL: %s", sql)
978+
}
979+
980+
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "未找到全局备份状态, SQL: %s", sql)
981+
}
982+
983+
// CheckGlobalBackupFinished 检查全局备份是否完成
984+
func (s *Spec) CheckGlobalBackupFinished(snapshotName string) (bool, error) {
985+
log.Tracef("检查全局备份是否完成, spec: %s, snapshot: %s", s.String(), snapshotName)
986+
987+
// 重试网络相关错误,避免在目标网络中断、进程重启时进行全量同步
988+
if backupState, status, err := s.checkGlobalBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
989+
return false, err
990+
} else if err == nil && backupState == BackupStateFinished {
991+
return true, nil
992+
} else if err == nil && backupState == BackupStateCancelled {
993+
return false, xerror.Errorf(xerror.Normal, "全局备份失败或被取消, 备份状态: %s", status)
994+
} else {
995+
// BackupStatePending, BackupStateUnknown 或网络相关错误
996+
if err != nil {
997+
log.Warnf("检查全局备份状态失败, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
998+
}
999+
return false, nil
1000+
}
1001+
}
1002+
1003+
func (s *Spec) RestoreGlobalInfo(sqls string) error {
1004+
log.Infof("恢复全局信息")
1005+
1006+
db, err := s.Connect()
1007+
if err != nil {
1008+
return err
1009+
}
1010+
1011+
log.Infof("执行全局恢复SQL: %s", sqls)
1012+
_, err = db.Exec(sqls)
1013+
if err != nil {
1014+
return xerror.Wrapf(err, xerror.Normal, "恢复全局信息失败, SQL: %s", sqls)
1015+
}
1016+
1017+
return nil
1018+
}
1019+
9001020
func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
9011021
log.Tracef("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)
9021022

pkg/ccr/base/specer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ type Specer interface {
4747
CancelRestoreIfExists(snapshotName string) error
4848
CreatePartialSnapshot(snapshotName, table string, partitions []string) error
4949
CreateSnapshot(snapshotName string, tables []string) error
50+
CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error
51+
RestoreGlobalInfo(sqls string) error
5052
CheckBackupFinished(snapshotName string) (bool, error)
53+
CheckGlobalBackupFinished(snapshotName string) (bool, error)
5154
CheckRestoreFinished(snapshotName string) (bool, error)
5255
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
5356
WaitTransactionDone(txnId int64) // busy wait

pkg/ccr/utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func NewBackupJobInfoFromJson(data []byte) (*BackupJobInfo, error) {
5555
return jobInfo, nil
5656
}
5757

58+
func NewBackupSqlsFromBytes(backupSqls []byte) (string, error) {
59+
return string(backupSqls), nil
60+
}
61+
5862
func (i *BackupJobInfo) TableNameMapping() map[int64]string {
5963
tableMapping := make(map[int64]string)
6064
for tableName, tableInfo := range i.BackupObjects {

pkg/rpc/fe.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type IFeRpc interface {
108108
GetBinlog(*base.Spec, int64, int64) (*festruct.TGetBinlogResult_, error)
109109
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
110110
GetSnapshot(*base.Spec, string, bool) (*festruct.TGetSnapshotResult_, error)
111+
GetGlobalSnapshot(*base.Spec, string) (*festruct.TGetGlobalSnapshotResult_, error)
111112
RestoreSnapshot(*base.Spec, *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error)
112113
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
113114
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
@@ -471,6 +472,20 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string, compress bool)
471472
return convertResult[festruct.TGetSnapshotResult_](result, err)
472473
}
473474

475+
// func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
476+
// // GetGlobalSnapshot 不支持主节点重定向,直接调用主节点客户端
477+
// // 这是因为 TGetGlobalSnapshotResult_ 结构体不包含 MasterAddress 字段
478+
// return rpc.masterClient.GetGlobalSnapshot(spec, labelName)
479+
// }
480+
481+
func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
482+
caller := func(client IFeRpc) (resultType, error) {
483+
return client.GetGlobalSnapshot(spec, labelName)
484+
}
485+
result, err := rpc.callWithMasterRedirect(caller)
486+
return convertResult[festruct.TGetGlobalSnapshotResult_](result, err)
487+
}
488+
474489
func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, req *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) {
475490
caller := func(client IFeRpc) (resultType, error) {
476491
return client.RestoreSnapshot(spec, req)
@@ -801,6 +816,39 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre
801816
}
802817
}
803818

819+
// struct TGetGlobalSnapshotRequest {
820+
// 1: optional string cluster
821+
// 2: optional string user
822+
// 3: optional string passwd
823+
// 4: optional string token
824+
// 5: optional string label_name
825+
// 6: optional string snapshot_name
826+
// 7: optional TSnapshotType snapshot_type
827+
// }
828+
func (rpc *singleFeClient) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
829+
log.Tracef("Call GetGlobalSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName)
830+
831+
defer xmetrics.RecordFeRpc("GetGlobalSnapshot", rpc.addr)()
832+
833+
client := rpc.client
834+
snapshotType := festruct.TSnapshotType_LOCAL
835+
req := &festruct.TGetGlobalSnapshotRequest{
836+
Cluster: &spec.Cluster,
837+
User: &spec.User,
838+
Passwd: &spec.Password,
839+
SnapshotName: &labelName,
840+
SnapshotType: &snapshotType,
841+
}
842+
843+
log.Tracef("GetGlobalSnapshotRequest user %s, cluster %s, snapshot name %s, snapshot type %d",
844+
req.GetUser(), req.GetCluster(), req.GetSnapshotName(), req.GetSnapshotType())
845+
if resp, err := client.GetGlobalSnapshot(context.Background(), req); err != nil {
846+
return nil, xerror.Wrapf(err, xerror.RPC, "GetGlobalSnapshot error: %v, req: %+v", err, req)
847+
} else {
848+
return resp, nil
849+
}
850+
}
851+
804852
// struct TRestoreSnapshotRequest {
805853
// 1: optional string cluster
806854
// 2: optional string user

0 commit comments

Comments
 (0)