Skip to content

Commit 38dc73a

Browse files
committed
Merge branch 'dev' into ccr_tb_to_tb
2 parents 2fd86b3 + a134ed7 commit 38dc73a

File tree

74 files changed

+736
-503
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+736
-503
lines changed

pkg/ccr/base/spec.go

Lines changed: 49 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -628,64 +628,57 @@ func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) {
628628
}
629629

630630
// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON ( src_1 ) PROPERTIES ("type" = "full");
631-
func (s *Spec) CreateSnapshot(tables []string) (string, error) {
631+
func (s *Spec) CreateSnapshot(snapshotName string, tables []string) error {
632632
if tables == nil {
633633
tables = make([]string, 0)
634634
}
635635
if len(tables) == 0 {
636636
tables = append(tables, s.Table)
637637
}
638638

639-
var snapshotName string
640639
var tableRefs string
641640
if len(tables) == 1 {
642-
// snapshot name format "ccrs_${table}_${timestamp}"
643641
// table refs = table
644-
snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
645642
tableRefs = utils.FormatKeywordName(tables[0])
646643
} else {
647-
// snapshot name format "ccrs_${db}_${timestamp}"
648644
// table refs = tables.join(", ")
649-
snapshotName = fmt.Sprintf("ccrs_%s_%d", s.Database, time.Now().Unix())
650645
tableRefs = "`" + strings.Join(tables, "`,`") + "`"
651646
}
652647

653648
// means source is a empty db, table number is 0
654649
if tableRefs == "``" {
655-
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
650+
return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
656651
}
657652

658653
db, err := s.Connect()
659654
if err != nil {
660-
return "", err
655+
return err
661656
}
662657

663658
backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(snapshotName), tableRefs)
664659
log.Infof("create snapshot %s.%s, backup snapshot sql: %s", s.Database, snapshotName, backupSnapshotSql)
665660
_, err = db.Exec(backupSnapshotSql)
666661
if err != nil {
667-
return "", xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
662+
return xerror.Wrapf(err, xerror.Normal, "backup snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
668663
}
669664

670-
return snapshotName, nil
665+
return nil
671666
}
672667

673668
// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
674-
func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string, error) {
669+
func (s *Spec) CreatePartialSnapshot(snapshotName, table string, partitions []string) error {
675670
if len(table) == 0 {
676-
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
671+
return xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
677672
}
678673

679-
// snapshot name format "ccrp_${table}_${timestamp}"
680674
// table refs = table
681-
snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
682675
tableRef := utils.FormatKeywordName(table)
683676

684677
log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)
685678

686679
db, err := s.Connect()
687680
if err != nil {
688-
return "", err
681+
return err
689682
}
690683

691684
partitionRefs := ""
@@ -698,10 +691,10 @@ func (s *Spec) CreatePartialSnapshot(table string, partitions []string) (string,
698691
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
699692
_, err = db.Exec(backupSnapshotSql)
700693
if err != nil {
701-
return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
694+
return xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
702695
}
703696

704-
return snapshotName, nil
697+
return nil
705698
}
706699

707700
// TODO: Add TaskErrMsg
@@ -757,103 +750,102 @@ func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
757750
}
758751
}
759752

760-
func (s *Spec) CancelBackupIfExists() error {
761-
log.Debugf("cancel backup job if exists, database: %s", s.Database)
753+
// Get the valid (running or finished) backup job with a unique prefix to indicate
754+
// if a backup job needs to be issued again.
755+
func (s *Spec) GetValidBackupJob(snapshotNamePrefix string) (string, error) {
756+
log.Debugf("get valid backup job if exists, database: %s, label prefix: %s", s.Database, snapshotNamePrefix)
762757

763758
db, err := s.Connect()
764759
if err != nil {
765-
return err
760+
return "", err
766761
}
767762

768-
query := fmt.Sprintf("SHOW BACKUP FROM %s", utils.FormatKeywordName(s.Database))
763+
query := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName LIKE \"%s%%\"",
764+
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
769765
log.Infof("show backup state sql: %s", query)
770766
rows, err := db.Query(query)
771767
if err != nil {
772-
return xerror.Wrap(err, xerror.Normal, "query backup state failed")
768+
return "", xerror.Wrap(err, xerror.Normal, "query backup state failed")
773769
}
774770
defer rows.Close()
775771

772+
labels := make([]string, 0)
776773
for rows.Next() {
777774
rowParser := utils.NewRowParser()
778775
if err := rowParser.Parse(rows); err != nil {
779-
return xerror.Wrap(err, xerror.Normal, "scan backup state failed")
776+
return "", xerror.Wrap(err, xerror.Normal, "scan backup state failed")
780777
}
781778

782779
info, err := parseBackupInfo(rowParser)
783780
if err != nil {
784-
return xerror.Wrap(err, xerror.Normal, "scan backup state failed")
781+
return "", xerror.Wrap(err, xerror.Normal, "scan backup state failed")
785782
}
786783

787784
log.Infof("check snapshot %s backup state [%v], create time: %s",
788785
info.SnapshotName, info.StateStr, info.CreateTime)
789786

790-
// Only cancel the running backup job issued by syncer
791-
if !isSyncerIssuedJob(info.SnapshotName, s.Database) {
787+
if info.State == BackupStateCancelled {
792788
continue
793789
}
794790

795-
if info.State == BackupStateFinished || info.State == BackupStateCancelled {
796-
continue
797-
}
791+
labels = append(labels, info.SnapshotName)
792+
}
798793

799-
cancelSql := fmt.Sprintf("CANCEL BACKUP FROM %s", s.Database)
800-
log.Infof("cancel backup sql: %s, snapshot: %s", cancelSql, info.SnapshotName)
801-
if _, err = db.Exec(cancelSql); err != nil {
802-
return xerror.Wrapf(err, xerror.Normal,
803-
"cancel backup job %s failed, database: %s", info.SnapshotName, s.Database)
804-
}
794+
// Return the last one. Assume that the result of `SHOW BACKUP` is ordered by CreateTime in ascending order.
795+
if len(labels) != 0 {
796+
return labels[len(labels)-1], nil
805797
}
806-
return nil
798+
799+
return "", nil
807800
}
808801

809-
func (s *Spec) CancelRestoreIfExists(srcDbName string) error {
810-
log.Debugf("cancel restore job if exists, src db: %s", srcDbName)
802+
// Get the valid (running or finished) restore job with a unique prefix to indicate
803+
// if a restore job needs to be issued again.
804+
func (s *Spec) GetValidRestoreJob(snapshotNamePrefix string) (string, error) {
805+
log.Debugf("get valid restore job if exists, label prefix: %s", snapshotNamePrefix)
811806

812807
db, err := s.Connect()
813808
if err != nil {
814-
return err
809+
return "", err
815810
}
816811

817-
query := fmt.Sprintf("SHOW RESTORE FROM %s", utils.FormatKeywordName(s.Database))
818-
log.Debugf("show restore state sql: %s", query)
812+
query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label LIKE \"%s%%\"",
813+
utils.FormatKeywordName(s.Database), snapshotNamePrefix)
814+
log.Infof("show restore state sql: %s", query)
819815
rows, err := db.Query(query)
820816
if err != nil {
821-
return xerror.Wrap(err, xerror.Normal, "query restore state failed")
817+
return "", xerror.Wrap(err, xerror.Normal, "query restore state failed")
822818
}
823819
defer rows.Close()
824820

821+
labels := make([]string, 0)
825822
for rows.Next() {
826823
rowParser := utils.NewRowParser()
827824
if err := rowParser.Parse(rows); err != nil {
828-
return xerror.Wrap(err, xerror.Normal, "scan restore state failed")
825+
return "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
829826
}
830827

831828
info, err := parseRestoreInfo(rowParser)
832829
if err != nil {
833-
return xerror.Wrap(err, xerror.Normal, "scan restore state failed")
830+
return "", xerror.Wrap(err, xerror.Normal, "scan restore state failed")
834831
}
835832

836833
log.Infof("check snapshot %s restore state: [%v], create time: %s",
837834
info.Label, info.StateStr, info.CreateTime)
838835

839-
// Only cancel the running restore job issued by syncer
840-
if !isSyncerIssuedJob(info.Label, srcDbName) {
836+
if info.State == RestoreStateFinished {
841837
continue
842838
}
843839

844-
if info.State == RestoreStateCancelled || info.State == RestoreStateFinished {
845-
continue
846-
}
847-
848-
cancelSql := fmt.Sprintf("CANCEL RESTORE FROM %s", utils.FormatKeywordName(s.Database))
849-
log.Infof("cancel restore sql: %s, running snapshot %s", cancelSql, info.Label)
840+
labels = append(labels, info.Label)
841+
}
850842

851-
_, err = db.Exec(cancelSql)
852-
if err != nil {
853-
return xerror.Wrapf(err, xerror.Normal, "cancel running restore failed, snapshot %s", info.Label)
854-
}
843+
// Return the last one. Assume that the result of `SHOW BACKUP` is ordered by CreateTime in ascending order.
844+
if len(labels) != 0 {
845+
return labels[len(labels)-1], nil
855846
}
856-
return nil
847+
848+
return "", nil
857849
}
858850

859851
// TODO: Add TaskErrMsg
@@ -1240,9 +1232,3 @@ func correctAddPartitionSql(addPartitionSql string, addPartition *record.AddPart
12401232
}
12411233
return addPartitionSql
12421234
}
1243-
1244-
func isSyncerIssuedJob(label, dbName string) bool {
1245-
fullSyncPrefix := fmt.Sprintf("ccrs_%s", dbName)
1246-
partialSyncPrefix := fmt.Sprintf("ccrp_%s", dbName)
1247-
return strings.HasPrefix(label, fullSyncPrefix) || strings.HasPrefix(label, partialSyncPrefix)
1248-
}

pkg/ccr/base/specer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ type Specer interface {
2525
CheckDatabaseExists() (bool, error)
2626
CheckTableExists() (bool, error)
2727
CheckTableExistsByName(tableName string) (bool, error)
28-
CreatePartialSnapshot(table string, partitions []string) (string, error)
29-
CreateSnapshot(tables []string) (string, error)
28+
GetValidBackupJob(snapshotNamePrefix string) (string, error)
29+
GetValidRestoreJob(snapshotNamePrefix string) (string, error)
30+
CreatePartialSnapshot(snapshotName, table string, partitions []string) error
31+
CreateSnapshot(snapshotName string, tables []string) error
3032
CheckBackupFinished(snapshotName string) (bool, error)
31-
CancelBackupIfExists() error
32-
CancelRestoreIfExists(srcDbName string) error
3333
CheckRestoreFinished(snapshotName string) (bool, error)
3434
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
3535
WaitTransactionDone(txnId int64) // busy wait

0 commit comments

Comments
 (0)