Skip to content

Commit b3c80d6

Browse files
committed
add debug mode
1 parent 2bf610a commit b3c80d6

File tree

3 files changed

+27
-14
lines changed

3 files changed

+27
-14
lines changed

README.md

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,20 @@ PostgreSQL logical replication to Kafka
4747

4848
## Options
4949

50-
确保数据库wal_level为逻辑复制logical, wal_level = logical 开启逻辑复制
50+
确保数据库wal_level为逻辑复制logical
5151

5252
```postgres
5353
show wal_level; 检查是否为logical
5454
```
5555

56+
自建可以修改conf文件:
5657
```
5758
# change requires restart
5859
wal_level = logical
5960
max_wal_senders = 10 # max number of walsender processes
6061
max_replication_slots = 10 # max number of replication slots
6162
```
62-
云服务如需配置见云产商说明,如阿里云 https://help.aliyun.com/zh/rds/apsaradb-rds-for-postgresql/logical-subscription
63+
RDS云服务如需配置见云产商说明,如阿里云 https://help.aliyun.com/zh/rds/apsaradb-rds-for-postgresql/logical-subscription
6364

6465

6566
- kafka_addr 推送目标kafka的地址
@@ -69,24 +70,26 @@ max_replication_slots = 10 # max number of replication slots
6970
- postgres postgres源库用户名
7071
- password postgres源库密码
7172
- db postgres源库database实例名称
72-
- pubname publication name,通过下面语句创建,源库里执行:
73-
```
74-
# CREATE PUBLICATION <发布名称> FOR TABLE <表名>;
73+
- pubname publication name,通过下面语句创建发布名称,源库里执行:
74+
```
75+
# CREATE PUBLICATION <发布名称> FOR TABLE <表名>;
7576
76-
CREATE PUBLICATION test FOR ALL TABLES
77+
CREATE PUBLICATION test FOR ALL TABLES
7778
78-
# view
79-
SELECT * FROM pg_publication;
80-
```
79+
# view
80+
SELECT * FROM pg_publication;
81+
```
8182
82-
如果update的action需要before_values需要修改表的REPLICA IDENTITY(复制标识)配置模式为FULL,否则before_values为空
83+
如果update的action需要before_values需要修改表的REPLICA IDENTITY(复制标识)配置模式为FULL,否则before_values为空
8384
84-
```
85-
ALTER TABLE test_tablename REPLICA IDENTITY FULL;
85+
```
86+
ALTER TABLE test_tablename REPLICA IDENTITY FULL;
8687
87-
```
88+
```
8889
8990
- slot_name 逻辑复制槽,程序默认为`pg_replicate_kafka`
91+
- debug 开启调试模式时,会输出推送kafka消息详情
92+
9093
9194
9295

main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ var (
1212
port int
1313
host, user, password, dbName, publicationName, slotName string
1414
kafkaTopicName, kafkaAddr string
15+
stateFilePath string
16+
isDebug bool
1517
)
1618

1719
func main() {
@@ -24,9 +26,15 @@ func main() {
2426
flag.StringVar(&dbName, "db", "postgres", "postgres database name")
2527
flag.StringVar(&publicationName, "pubname", "", "publication name created via CREATE PUBLICATION {name} FOR ALL TABLES")
2628
flag.StringVar(&slotName, "slot_name", "pg_replicate_kafka", "slot name")
29+
flag.BoolVar(&isDebug, "debug", false, "is debug mode")
30+
flag.StringVar(&stateFilePath, "replicate_state_file", "", "save replicate state point")
2731
flag.Parse()
2832

2933
defaultStateFile := fmt.Sprintf("pg_replication_%s.state", dbName)
34+
if len(stateFilePath) > 0 {
35+
defaultStateFile = stateFilePath
36+
}
37+
3038
logicReplicator := NewReplicator(defaultStateFile, dbName, NewReplicateDSN(dbName, user, password, host, port), slotName, publicationName, kafkaTopicName)
3139

3240
ctx := context.Background()

pg_replicator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ func (r *Replicator) BeginReplication(ctx context.Context) error {
333333
r.err = err
334334
return
335335
}
336-
// logger.Debug(ctx).Interface("row", row).Msg("push to kafka")
336+
if isDebug {
337+
logger.Debug(ctx).Interface("row", row).Msg("pushed to kafka")
338+
}
337339
}
338340
replicatePos.UpdateStandbyStatus = true
339341
logger.Info(ctx).Interface("LastWriteLSN", replicatePos.LastWriteLSN).Msg("commit local")

0 commit comments

Comments
 (0)