Skip to content

Commit 2bf610a

Browse files
committed
add readme
1 parent 4daf8d2 commit 2bf610a

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

README.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,92 @@
11
# pg-replication-kafka
22
PostgreSQL logical replication to Kafka
3+
4+
[go-binlog-kafka](https://github.com/feiin/go-binlog-kafka)PostgreSQL版本,将pg数据库的WAL日志解析成JSON推送至Kafka消息队列
5+
6+
## Getting Started
7+
8+
最简单方式启动查看
9+
10+
```
11+
./pg-replication-kafka -host=10.0.0.1 -port=5432 -user=postgres -password=postgrespass -db=test -pubname=test -kafka_addr=10.0.0.1:9092 -kafka_topic_name=postgres
12+
13+
```
14+
15+
## 推送的JSON格式数据
16+
17+
每条数据变更(insert/update/delete)都会解析成以下格式JSON推送至配置的kafka的topic
18+
```
19+
{
20+
"log_pos": 33827184, // uint64
21+
"action": "insert", // insert/update/delete/DDL action
22+
"schema": "tests", // 库名称
23+
"namespace":"public" // namespace
24+
"table": "tests", // 表名称
25+
"gtid": "884",// xid transaction id
26+
"timestamp": 1713158815, // event unix时间戳
27+
"values": null, // insert/delete 时是对应行数据
28+
"before_values":{...} // update 变更前行数据
29+
"after_values":{...} // update 变更后行数据
30+
}
31+
```
32+
33+
```
34+
# insert时,推送格式数据如下
35+
36+
{"action":"insert","gtid":"885","log_pos":33828584,"namespace":"public","schema":"test","table":"article","timestamp":1735354300,"values":{"content":"this is content","id":3,"title":"title test"}}
37+
38+
# update时,推送格式如下
39+
40+
{"action":"update","after_values":{"content":"this is content","id":3,"title":"title test"},"before_values":{"content":"this is content 2024","id":3,"title":"title test"},"gtid":"888","log_pos":33832840,"namespace":"public","schema":"test","table":"article","timestamp":1735354352}
41+
42+
# delete时,推送数据如下
43+
44+
{"action":"delete","gtid":"889","log_pos":33833064,"namespace":"public","schema":"test","table":"article","timestamp":1735354378,"values":{"content":"this is content","id":3,"title":"title test"}}
45+
46+
```
47+
48+
## Options
49+
50+
确保数据库wal_level为逻辑复制logical, wal_level = logical 开启逻辑复制
51+
52+
```postgres
53+
show wal_level; 检查是否为logical
54+
```
55+
56+
```
57+
# change requires restart
58+
wal_level = logical
59+
max_wal_senders = 10 # max number of walsender processes
60+
max_replication_slots = 10 # max number of replication slots
61+
```
62+
云服务如需配置见云产商说明,如阿里云 https://help.aliyun.com/zh/rds/apsaradb-rds-for-postgresql/logical-subscription
63+
64+
65+
- kafka_addr 推送目标kafka的地址
66+
- kafka_topic_name 推送目标的kafka topic名称
67+
- host postgres源库地址
68+
- port postgres源库端口
69+
- postgres postgres源库用户名
70+
- password postgres源库密码
71+
- db postgres源库database实例名称
72+
- pubname publication name,通过下面语句创建,源库里执行:
73+
```
74+
# CREATE PUBLICATION <发布名称> FOR TABLE <表名>;
75+
76+
CREATE PUBLICATION test FOR ALL TABLES
77+
78+
# view
79+
SELECT * FROM pg_publication;
80+
```
81+
82+
如果update的action需要before_values需要修改表的REPLICA IDENTITY(复制标识)配置模式为FULL,否则before_values为空
83+
84+
```
85+
ALTER TABLE test_tablename REPLICA IDENTITY FULL;
86+
87+
```
88+
89+
- slot_name 逻辑复制槽,程序默认为`pg_replicate_kafka`
90+
91+
92+

pg_replicator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ func (r *Replicator) BeginReplication(ctx context.Context) error {
333333
r.err = err
334334
return
335335
}
336+
// logger.Debug(ctx).Interface("row", row).Msg("push to kafka")
336337
}
337338
replicatePos.UpdateStandbyStatus = true
338339
logger.Info(ctx).Interface("LastWriteLSN", replicatePos.LastWriteLSN).Msg("commit local")

pg_replicator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
func TestSaveReplicateState(t *testing.T) {
99
r := NewReplicator("./test.state", "test_db", "", "test_pubname", "test_slotname", "test_kafkatopic")
1010

11-
err := r.SyncReplicateStatus(context.Background(), &ReplicationStatus{LastWriteLSN: 999})
11+
err := r.SyncReplicateStatus(context.Background(), 999)
1212
if err != nil {
1313
t.Error("SyncReplicateStatus error")
1414
}

0 commit comments

Comments
 (0)