Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sources/pg_replication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Resources that can be loaded using this verified source are:
| Name | Description |
|----------------------|-------------------------------------------------|
| replication_resource | Load published messages from a replication slot |
| init_replication | Initialize replication and optionally return snapshot resources for initial data load |

## Initialize the pipeline

Expand All @@ -29,6 +30,13 @@ It also needs `CREATE` privilege on the database:
GRANT CREATE ON DATABASE dlt_data TO replication_user;
```

If not a superuser, the user must have ownership of the tables that need to be replicated:

```sql
ALTER TABLE your_table OWNER TO replication_user;
```


### Set up RDS
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:
Expand Down
7 changes: 4 additions & 3 deletions sources/pg_replication/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from dlt.sources.credentials import ConnectionStringCredentials
from dlt.sources.sql_database import (
sql_table as core_sql_table,
sql_database as core_sql_datbase,
sql_database as core_sql_database,
)

from .schema_types import _to_dlt_column_schema, _to_dlt_val
Expand Down Expand Up @@ -114,7 +114,8 @@ def init_replication(
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
to include in the publication. If not provided, the whole schema with `schema_name` will be replicated
(also tables added to the schema after the publication was created). You need superuser privileges
for the schema replication.
for the whole schema replication. When specifying individual table names, the database role must
own the tables if it's not a superuser.
credentials (ConnectionStringCredentials): Postgres database credentials.
publish (str): Comma-separated string of DML operations. Can be used to
control which changes are included in the publication. Allowed operations
Expand Down Expand Up @@ -184,7 +185,7 @@ def init_replication(
# do not include dlt tables
table_names = [
table_name
for table_name in core_sql_datbase(
for table_name in core_sql_database(
credentials, schema=schema_name, reflection_level="minimal"
).resources.keys()
if not table_name.lower().startswith(DLT_NAME_PREFIX)
Expand Down
Loading