Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ func ParseBackupState(state string) BackupState {
}
}

// isSystemDatabase determines if it's a system database that should be skipped
func isSystemDatabase(dbName string) bool {
systemDatabases := []string{
"information_schema",
"mysql",
"__internal_schema",
}

for _, sysDb := range systemDatabases {
if dbName == sysDb {
return true
}
}
return false
}

type RestoreState int

const (
Expand Down Expand Up @@ -464,6 +480,40 @@ func (s *Spec) GetAllTables() ([]string, error) {
return tables, nil
}

func (s *Spec) GetAllDatabases() ([]string, error) {
log.Tracef("get all databases from cluster")

db, err := s.Connect()
if err != nil {
return nil, err
}

sql := "SHOW DATABASES"
rows, err := db.Query(sql)
if err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "query %s failed", sql)
}
defer rows.Close()

var databases []string
for rows.Next() {
var database string
if err := rows.Scan(&database); err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "scan database failed")
}
// Filter system databases
if !isSystemDatabase(database) {
databases = append(databases, database)
}
}

if err := rows.Err(); err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "rows error")
}

return databases, nil
}

func (s *Spec) queryResult(querySQL string, queryColumn string, errMsg string) ([]string, error) {
db, err := s.Connect()
if err != nil {
Expand Down Expand Up @@ -773,6 +823,45 @@ func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
return nil
}

// CreateGlobalSnapshot creates a global snapshot for synchronizing global objects
// Sets different properties based on different options
func (s *Spec) CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error {
log.Infof("Creating global snapshot %s", snapshotName)

db, err := s.Connect()
if err != nil {
return err
}

// Build SQL statement
sql := fmt.Sprintf("BACKUP GLOBAL SNAPSHOT %s TO `__keep_on_local__`", utils.FormatKeywordName(snapshotName))

// Add properties based on different options
properties := make([]string, 0)
if backupPrivilege {
properties = append(properties, "\"backup_privilege\" = \"true\"")
}
if backupCatalog {
properties = append(properties, "\"backup_catalog\" = \"true\"")
}
if backupWorkloadGroup {
properties = append(properties, "\"backup_workload_group\" = \"true\"")
}

// If there are properties, add PROPERTIES clause
if len(properties) > 0 {
sql += fmt.Sprintf(" PROPERTIES (%s)", strings.Join(properties, ", "))
}

log.Infof("Creating global snapshot SQL: %s", sql)
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "Failed to create global snapshot %s, SQL: %s", snapshotName, sql)
}

return nil
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []string) error {
if len(table) == 0 {
Expand Down Expand Up @@ -847,6 +936,87 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, string, er
return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "no backup state found, sql: %s", sql)
}

// checkGlobalBackupFinished checks global backup status
func (s *Spec) checkGlobalBackupFinished(snapshotName string) (BackupState, string, error) {
log.Tracef("Checking global backup status %s", snapshotName)

db, err := s.Connect()
if err != nil {
return BackupStateUnknown, "", err
}

sql := fmt.Sprintf("SHOW GLOBAL BACKUP WHERE SnapshotName = \"%s\"", snapshotName)
log.Infof("Checking global backup status SQL: %s", sql)
rows, err := db.Query(sql)
if err != nil {
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Failed to query global backup status, SQL: %s", sql)
}
defer rows.Close()

if rows.Next() {
rowParser := utils.NewRowParser()
if err := rowParser.Parse(rows); err != nil {
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, sql)
}

stateStr, err := rowParser.GetString("State")
if err != nil {
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup State")
}

status, err := rowParser.GetString("Status")
if err != nil {
return BackupStateUnknown, "", xerror.Wrap(err, xerror.Normal, "Failed to parse global backup Status")
}

log.Infof("Checking global snapshot %s backup status: [%v]", snapshotName, stateStr)
return ParseBackupState(stateStr), status, nil
}

if err := rows.Err(); err != nil {
return BackupStateUnknown, "", xerror.Wrapf(err, xerror.Normal, "Check global backup status, SQL: %s", sql)
}

return BackupStateUnknown, "", xerror.Errorf(xerror.Normal, "Global backup status not found, SQL: %s", sql)
}

// CheckGlobalBackupFinished checks if global backup is finished
func (s *Spec) CheckGlobalBackupFinished(snapshotName string) (bool, error) {
log.Tracef("Checking if global backup is finished, spec: %s, snapshot: %s", s.String(), snapshotName)

// Retry network related errors to avoid full sync when target network is interrupted or process is restarted
if backupState, status, err := s.checkGlobalBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if err == nil && backupState == BackupStateFinished {
return true, nil
} else if err == nil && backupState == BackupStateCancelled {
return false, xerror.Errorf(xerror.Normal, "Global backup failed or cancelled, backup status: %s", status)
} else {
// BackupStatePending, BackupStateUnknown or network related errors
if err != nil {
log.Warnf("Failed to check global backup status, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
return false, nil
}
}

func (s *Spec) RestoreGlobalInfo(sqls string) error {
log.Infof("Restoring global information")

db, err := s.Connect()
if err != nil {
return err
}

log.Infof("Executing global restore SQL: %s", sqls)
_, err = db.Exec(sqls)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "Failed to restore global information, SQL: %s", sqls)
}

return nil
}

func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
log.Tracef("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ type Specer interface {
CancelRestoreIfExists(snapshotName string) error
CreatePartialSnapshot(snapshotName, table string, partitions []string) error
CreateSnapshot(snapshotName string, tables []string) error
CreateGlobalSnapshot(snapshotName string, backupPrivilege, backupCatalog, backupWorkloadGroup bool) error
RestoreGlobalInfo(sqls string) error
CheckBackupFinished(snapshotName string) (bool, error)
CheckGlobalBackupFinished(snapshotName string) (bool, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
WaitTransactionDone(txnId int64) // busy wait
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ func init() {
type SyncType int

const (
DBSync SyncType = 0
TableSync SyncType = 1
DBSync SyncType = 0
TableSync SyncType = 1
ClusterSync SyncType = 2
)

func (s SyncType) String() string {
Expand All @@ -137,6 +138,8 @@ func (s SyncType) String() string {
return "db_sync"
case TableSync:
return "table_sync"
case ClusterSync:
return "cluster_sync"
default:
return "unknown_sync"
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/ccr/specer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/ccr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func NewBackupJobInfoFromJson(data []byte) (*BackupJobInfo, error) {
return jobInfo, nil
}

func NewBackupSqlsFromBytes(backupSqls []byte) (string, error) {
return string(backupSqls), nil
}

func (i *BackupJobInfo) TableNameMapping() map[int64]string {
tableMapping := make(map[int64]string)
for tableName, tableInfo := range i.BackupObjects {
Expand Down
44 changes: 44 additions & 0 deletions pkg/rpc/fe.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type IFeRpc interface {
GetBinlog(*base.Spec, int64, int64) (*festruct.TGetBinlogResult_, error)
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
GetSnapshot(*base.Spec, string, bool) (*festruct.TGetSnapshotResult_, error)
GetGlobalSnapshot(*base.Spec, string) (*festruct.TGetGlobalSnapshotResult_, error)
RestoreSnapshot(*base.Spec, *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error)
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
Expand Down Expand Up @@ -471,6 +472,14 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string, compress bool)
return convertResult[festruct.TGetSnapshotResult_](result, err)
}

func (rpc *FeRpc) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
caller := func(client IFeRpc) (resultType, error) {
return client.GetGlobalSnapshot(spec, labelName)
}
result, err := rpc.callWithMasterRedirect(caller)
return convertResult[festruct.TGetGlobalSnapshotResult_](result, err)
}

func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, req *RestoreSnapshotRequest) (*festruct.TRestoreSnapshotResult_, error) {
caller := func(client IFeRpc) (resultType, error) {
return client.RestoreSnapshot(spec, req)
Expand Down Expand Up @@ -801,6 +810,41 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string, compre
}
}

// struct TGetGlobalSnapshotRequest {
// 1: optional string cluster
// 2: optional string user
// 3: optional string passwd
// 4: optional string token
// 5: optional string label_name
// 6: optional string snapshot_name
// 7: optional TSnapshotType snapshot_type
// }
//
// Get Global Snapshot rpc
func (rpc *singleFeClient) GetGlobalSnapshot(spec *base.Spec, labelName string) (*festruct.TGetGlobalSnapshotResult_, error) {
log.Tracef("Call GetGlobalSnapshot, addr: %s, spec: %s, label: %s", rpc.Address(), spec, labelName)

defer xmetrics.RecordFeRpc("GetGlobalSnapshot", rpc.addr)()

client := rpc.client
snapshotType := festruct.TSnapshotType_LOCAL
req := &festruct.TGetGlobalSnapshotRequest{
Cluster: &spec.Cluster,
User: &spec.User,
Passwd: &spec.Password,
SnapshotName: &labelName,
SnapshotType: &snapshotType,
}

log.Tracef("GetGlobalSnapshotRequest user %s, cluster %s, snapshot name %s, snapshot type %d",
req.GetUser(), req.GetCluster(), req.GetSnapshotName(), req.GetSnapshotType())
if resp, err := client.GetGlobalSnapshot(context.Background(), req); err != nil {
return nil, xerror.Wrapf(err, xerror.RPC, "GetGlobalSnapshot error: %v, req: %+v", err, req)
} else {
return resp, nil
}
}

// struct TRestoreSnapshotRequest {
// 1: optional string cluster
// 2: optional string user
Expand Down
Loading