Skip to content

Commit 44229b7

Browse files
authored
[feat](binlog) Add Support recover binlog (#284)
1 parent 60d48d9 commit 44229b7

File tree

21 files changed

+1482
-9
lines changed

21 files changed

+1482
-9
lines changed

pkg/ccr/job.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2567,6 +2567,55 @@ func (j *Job) handleDropRollupRecord(commitSeq int64, dropRollup *record.DropRol
25672567
return j.IDest.DropRollup(destTableName, dropRollup.IndexName)
25682568
}
25692569

2570+
func (j *Job) handleRecoverInfo(binlog *festruct.TBinlog) error {
2571+
log.Infof("handle recoverInfo binlog, prevCommitSeq: %d, commitSeq: %d",
2572+
j.progress.PrevCommitSeq, j.progress.CommitSeq)
2573+
2574+
data := binlog.GetData()
2575+
recoverInfo, err := record.NewRecoverInfoFromJson(data)
2576+
if err != nil {
2577+
return err
2578+
}
2579+
2580+
return j.handleRecoverInfoRecord(binlog.GetCommitSeq(), recoverInfo)
2581+
}
2582+
2583+
func isRecoverTable(recoverInfo *record.RecoverInfo) bool {
2584+
if recoverInfo.PartitionName == "" || recoverInfo.PartitionId == -1 {
2585+
return true
2586+
}
2587+
return false
2588+
}
2589+
2590+
func (j *Job) handleRecoverInfoRecord(commitSeq int64, recoverInfo *record.RecoverInfo) error {
2591+
if j.isBinlogCommitted(recoverInfo.TableId, commitSeq) {
2592+
return nil
2593+
}
2594+
2595+
if isRecoverTable(recoverInfo) {
2596+
var tableName string
2597+
if recoverInfo.NewTableName != "" {
2598+
tableName = recoverInfo.NewTableName
2599+
} else {
2600+
tableName = recoverInfo.TableName
2601+
}
2602+
log.Infof("recover info with for table %s, will trigger partial sync", tableName)
2603+
return j.newPartialSnapshot(recoverInfo.TableId, tableName, nil, true)
2604+
}
2605+
2606+
var partitions []string
2607+
if recoverInfo.NewPartitionName != "" {
2608+
partitions = append(partitions, recoverInfo.NewPartitionName)
2609+
} else {
2610+
partitions = append(partitions, recoverInfo.PartitionName)
2611+
}
2612+
log.Infof("recover info with for partition(%s) for table %s, will trigger partial sync",
2613+
partitions, recoverInfo.TableName)
2614+
// if source does multiple recover of partition, then there is a race
2615+
// condition and some recover might miss due to commitseq change after snapshot.
2616+
return j.newPartialSnapshot(recoverInfo.TableId, recoverInfo.TableName, nil, true)
2617+
}
2618+
25702619
func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
25712620
data := binlog.GetData()
25722621
barrierLog, err := record.NewBarrierLogFromJson(data)
@@ -2645,6 +2694,12 @@ func (j *Job) handleBarrier(binlog *festruct.TBinlog) error {
26452694
return err
26462695
}
26472696
return j.handleModifyCommentRecord(commitSeq, modifyComment)
2697+
case festruct.TBinlogType_RECOVER_INFO:
2698+
recoverInfo, err := record.NewRecoverInfoFromJson(barrierLog.Binlog)
2699+
if err != nil {
2700+
return err
2701+
}
2702+
return j.handleRecoverInfoRecord(commitSeq, recoverInfo)
26482703
case festruct.TBinlogType_BARRIER:
26492704
log.Info("handle barrier binlog, ignore it")
26502705
default:
@@ -2757,6 +2812,8 @@ func (j *Job) handleBinlog(binlog *festruct.TBinlog) error {
27572812
return j.handleRenameRollup(binlog)
27582813
case festruct.TBinlogType_DROP_ROLLUP:
27592814
return j.handleDropRollup(binlog)
2815+
case festruct.TBinlogType_RECOVER_INFO:
2816+
return j.handleRecoverInfo(binlog)
27602817
default:
27612818
return xerror.Errorf(xerror.Normal, "unknown binlog type: %v", binlog.GetType())
27622819
}

pkg/ccr/record/recover_info.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package record
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/selectdb/ccr_syncer/pkg/xerror"
8+
)
9+
10+
type RecoverInfo struct {
11+
DbId int64 `json:"dbId"`
12+
NewDbName string `json:"newDbName"`
13+
TableId int64 `json:"tableId"`
14+
TableName string `json:"tableName"`
15+
NewTableName string `json:"newTableName"`
16+
PartitionId int64 `json:"partitionId"`
17+
PartitionName string `json:"partitionName"`
18+
NewPartitionName string `json:"newPartitionName"`
19+
}
20+
21+
func NewRecoverInfoFromJson(data string) (*RecoverInfo, error) {
22+
var recoverInfo RecoverInfo
23+
err := json.Unmarshal([]byte(data), &recoverInfo)
24+
if err != nil {
25+
return nil, xerror.Wrap(err, xerror.Normal, "unmarshal create table error")
26+
}
27+
28+
if recoverInfo.TableId == 0 {
29+
return nil, xerror.Errorf(xerror.Normal, "table id not found")
30+
}
31+
32+
// table name must exist. partition name not checked since optional.
33+
if recoverInfo.TableName == "" {
34+
return nil, xerror.Errorf(xerror.Normal, "Table Name can not be null")
35+
}
36+
return &recoverInfo, nil
37+
}
38+
39+
// String
40+
func (c *RecoverInfo) String() string {
41+
return fmt.Sprintf("RecoverInfo: DbId: %d, NewDbName: %s, TableId: %d, TableName: %s, NewTableName: %s, PartitionId: %d, PartitionName: %s, NewPartitionName: %s",
42+
c.DbId, c.NewDbName, c.TableId, c.TableName, c.NewTableName, c.PartitionId, c.PartitionName, c.NewPartitionName)
43+
}

pkg/rpc/kitex_gen/frontendservice/FrontendService.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/rpc/thrift/FrontendService.thrift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,7 @@ enum TBinlogType {
11971197
RENAME_ROLLUP = 21,
11981198
RENAME_PARTITION = 22,
11991199
DROP_ROLLUP = 23,
1200-
1200+
RECOVER_INFO = 24,
12011201
// Keep some IDs for allocation so that when new binlog types are added in the
12021202
// future, the changes can be picked back to the old versions without breaking
12031203
// compatibility.
@@ -1213,8 +1213,7 @@ enum TBinlogType {
12131213
// MODIFY_XXX = 17,
12141214
// MIN_UNKNOWN = 18,
12151215
// UNKNOWN_3 = 19,
1216-
MIN_UNKNOWN = 24,
1217-
UNKNOWN_9 = 25,
1216+
MIN_UNKNOWN = 25,
12181217
UNKNOWN_10 = 26,
12191218
UNKNOWN_11 = 27,
12201219
UNKNOWN_12 = 28,

regression-test/common/helper.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ class Helper {
189189
"""
190190
}
191191

192+
void disableDbBinlog() {
193+
suite.sql """
194+
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
195+
"""
196+
}
197+
192198
Boolean checkShowTimesOf(sqlString, myClosure, times, func = "sql") {
193199
Boolean ret = false
194200
List<List<Object>> res
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !target_sql_content --
3+
3 0
4+
3 1
5+
3 2
6+
5 0
7+
5 1
8+
5 2
9+
10+
-- !sql_source_content --
11+
3 0
12+
3 1
13+
3 2
14+
5 0
15+
5 1
16+
5 2
17+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !target_sql_content --
3+
3 0
4+
3 1
5+
3 2
6+
5 0
7+
5 1
8+
5 2
9+
10+
-- !sql_source_content --
11+
3 0
12+
3 1
13+
3 2
14+
5 0
15+
5 1
16+
5 2
17+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !target_sql_content --
3+
0 0
4+
0 1
5+
0 2
6+
2 0
7+
2 1
8+
2 2
9+
10+
-- !sql_source_content --
11+
0 0
12+
0 1
13+
0 2
14+
2 0
15+
2 1
16+
2 2
17+
18+
-- !target_sql_content_2 --
19+
0 0
20+
0 1
21+
0 2
22+
2 0
23+
2 1
24+
2 2
25+
3 0
26+
3 1
27+
3 2
28+
29+
-- !sql_source_content_2 --
30+
0 0
31+
0 1
32+
0 2
33+
2 0
34+
2 1
35+
2 2
36+
3 0
37+
3 1
38+
3 2
39+
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !target_sql_content --
3+
0 0
4+
0 1
5+
0 2
6+
2 0
7+
2 1
8+
2 2
9+
10+
-- !sql_source_content --
11+
0 0
12+
0 1
13+
0 2
14+
2 0
15+
2 1
16+
2 2
17+
18+
-- !target_sql_content_2 --
19+
0 0
20+
0 1
21+
0 2
22+
2 0
23+
2 1
24+
2 2
25+
3 0
26+
3 1
27+
3 2
28+
29+
-- !sql_source_content_2 --
30+
0 0
31+
0 1
32+
0 2
33+
2 0
34+
2 1
35+
2 2
36+
3 0
37+
3 1
38+
3 2
39+
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !target_sql_content_2 --
3+
0 0
4+
0 1
5+
0 2
6+
10 0
7+
10 1
8+
10 2
9+
11 0
10+
11 1
11+
11 2
12+
12 0
13+
12 1
14+
12 2
15+
13 0
16+
13 1
17+
13 2
18+
14 0
19+
14 1
20+
14 2
21+
15 0
22+
15 1
23+
15 2
24+
16 0
25+
16 1
26+
16 2
27+
17 0
28+
17 1
29+
17 2
30+
18 0
31+
18 1
32+
18 2
33+
19 0
34+
19 1
35+
19 2
36+
20 0
37+
20 1
38+
20 2
39+
40+
-- !sql_source_content_2 --
41+
0 0
42+
0 1
43+
0 2
44+
10 0
45+
10 1
46+
10 2
47+
11 0
48+
11 1
49+
11 2
50+
12 0
51+
12 1
52+
12 2
53+
13 0
54+
13 1
55+
13 2
56+
14 0
57+
14 1
58+
14 2
59+
15 0
60+
15 1
61+
15 2
62+
16 0
63+
16 1
64+
16 2
65+
17 0
66+
17 1
67+
17 2
68+
18 0
69+
18 1
70+
18 2
71+
19 0
72+
19 1
73+
19 2
74+
20 0
75+
20 1
76+
20 2
77+

0 commit comments

Comments
 (0)