Skip to content

Commit 98552a1

Browse files
committed
Example load with no manual source creation
1 parent ce62f48 commit 98552a1

File tree

1 file changed

+53
-41
lines changed

1 file changed

+53
-41
lines changed

sources/pg_replication_pipeline.py

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,50 +9,53 @@
99

1010
PG_CREDS = dlt.secrets.get("sources.pg_replication.credentials", PostgresCredentials)
1111

12-
def replicate_with_initial_load(pipeline_name: str, slot_name: str, pub_name: str) -> None:
13-
"""Production example: Sets up replication with initial load.
1412

15-
Demonstrates usage of `persist_snapshots` argument and snapshot resource
16-
returned by `init_replication` helper.
13+
def replicate_with_initial_load(
14+
schema_name: str, slot_name: str, pub_name: str
15+
) -> None:
16+
"""Production example: Sets up replication with initial load for your existing PostgreSQL database.
17+
18+
Use this when you have real data that needs to be replicated. This is NOT a simulation -
19+
it connects to your actual database and performs initial load plus ongoing replication.
20+
21+
Demonstrates usage of `persist_snapshots` argument and snapshot resource returned by `init_replication`.
1722
"""
18-
# create source and destination pipelines
23+
# create destination pipeline
1924
dest_pl = dlt.pipeline(
20-
pipeline_name=pipeline_name,
21-
destination='duckdb',
22-
dataset_name="replication_postgres",
25+
pipeline_name="pg_replication_pipeline",
26+
destination="duckdb",
27+
dataset_name="replicate_with_initial_load",
28+
dev_mode=True,
2329
)
24-
schema_name = "public"
2530

26-
creds = dlt.secrets.get(
27-
f"{pipeline_name}.sources.pg_replication.credentials", PostgresCredentials
28-
)
29-
print(creds)
3031
snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
3132
slot_name=slot_name,
32-
credentials=creds,
3333
pub_name=pub_name,
34+
table_names=[],
3435
schema_name=schema_name,
3536
persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load
36-
reset=True
37+
reset=True,
3738
)
38-
print("replication initialized")
39+
3940
# perform initial load to capture all records present in source table prior to replication initialization
4041
dest_pl.run(snapshot)
41-
print("replication run")
42-
# insert record in source table and propagate change to destination
43-
changes = replication_resource(slot_name, pub_name, credentials=creds)
44-
print("changes initialized")
42+
show_destination_table(dest_pl)
43+
44+
# assuming there were changes in the source table, propagate change to destination
45+
changes = replication_resource(slot_name, pub_name)
4546
dest_pl.run(changes)
46-
print("changes run")
47-
47+
show_destination_table(dest_pl)
48+
4849

4950
def replicate_single_table_demo() -> None:
50-
"""Sets up replication for a single Postgres table and loads changes into a destination.
51+
"""Demonstrates PostgreSQL replication by simulating a source table and changes.
52+
53+
Shows basic usage of `init_replication` helper and `replication_resource` resource.
54+
This demo creates a source table and simulates INSERT, UPDATE, and DELETE operations
55+
to show how replication works end-to-end. In production, you would have an existing
56+
PostgreSQL database with real changes instead of simulating them.
5157
52-
Demonstrates basic usage of `init_replication` helper and `replication_resource` resource.
53-
Uses `src_pl` to create and change the replicated Postgres table—this
54-
is only for demonstration purposes, you won't need this when you run in production
55-
as you'll probably have another process feeding your Postgres instance.
58+
For production use with existing data, see `replicate_with_initial_load()`.
5659
"""
5760
# create source and destination pipelines
5861
src_pl = get_postgres_pipeline()
@@ -100,16 +103,14 @@ def replicate_single_table_demo() -> None:
100103
show_destination_table(dest_pl)
101104

102105

103-
def demo_initial_load_replication() -> None:
104-
"""Sets up replication with initial load.
106+
def replicate_with_initial_load_demo() -> None:
107+
"""Demonstrates PostgreSQL replication with initial load by simulating a source table and changes.
105108
106-
Demonstrates usage of `persist_snapshots` argument and snapshot resource
107-
returned by `init_replication` helper.
109+
Shows usage of `persist_snapshots` argument and snapshot resource returned by `init_replication` helper.
110+
This demo creates a source table with existing data, then simulates additional changes to show how
111+
initial load captures pre-existing records and replication handles subsequent changes.
108112
109-
Notes:
110-
- This function also creates the source table itself. That’s only useful for demos or
111-
when starting with a brand-new database. In production you normally won’t create tables here,
112-
since your application/database already has them.
113+
For production use with existing data, see `replicate_with_initial_load()`.
113114
"""
114115
# create source and destination pipelines
115116
src_pl = get_postgres_pipeline()
@@ -154,10 +155,16 @@ def demo_initial_load_replication() -> None:
154155

155156

156157
def replicate_entire_schema_demo() -> None:
157-
"""Demonstrates setup and usage of schema replication.
158+
"""Demonstrates schema-level replication by simulating multiple tables and changes.
159+
160+
Shows setup and usage of schema replication, which captures changes across all tables
161+
in a schema. This demo creates multiple source tables and simulates changes to show
162+
how schema replication works, including tables added after replication starts.
158163
159-
Schema replication requires a Postgres server version of 15 or higher. An
160-
exception is raised if that's not the case.
164+
Schema replication requires PostgreSQL server version 15 or higher. An exception
165+
is raised if that's not the case.
166+
167+
For production use with existing schemas, adapt this pattern to your real database.
161168
"""
162169
# create source and destination pipelines
163170
src_pl = get_postgres_pipeline()
@@ -214,9 +221,13 @@ def replicate_entire_schema_demo() -> None:
214221

215222

216223
def replicate_with_column_selection_demo() -> None:
217-
"""Sets up replication with column selection.
224+
"""Demonstrates column selection in replication by simulating tables with selective column capture.
225+
226+
Shows usage of `include_columns` argument to replicate only specific columns from tables.
227+
This demo creates source tables and simulates changes to show how column selection works,
228+
where some tables have filtered columns while others include all columns by default.
218229
219-
Demonstrates usage of `include_columns` argument.
230+
For production use, apply the `include_columns` pattern to your existing tables.
220231
"""
221232
# create source and destination pipelines
222233
src_pl = get_postgres_pipeline()
@@ -327,7 +338,8 @@ def show_destination_table(
327338

328339

329340
if __name__ == "__main__":
341+
# replicate_with_initial_load(schema_name="public", slot_name="test_slot", pub_name="test_pub")
330342
replicate_single_table_demo()
331-
# replicate_with_initial_load()
343+
# replicate_with_initial_load_demo()
332344
# replicate_entire_schema_demo()
333345
# replicate_with_column_selection_demo()

0 commit comments

Comments
 (0)