Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions postgres/changelog.d/21844.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PG: Add metrics for pg_stat_replication's sent/write/flush/replay
38 changes: 25 additions & 13 deletions postgres/datadog_checks/postgres/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,27 @@ def trim_leading_set_stmts(sql):
'name': 'replication_stats_metrics',
'query': """
SELECT
pg_stat_replication.application_name,
pg_stat_replication.state,
pg_stat_replication.sync_state,
pg_stat_replication.client_addr,
pg_stat_replication_slot.slot_name,
pg_stat_replication_slot.slot_type,
GREATEST (0, age(pg_stat_replication.backend_xmin)) as backend_xmin_age,
GREATEST (0, EXTRACT(epoch from pg_stat_replication.write_lag)) as write_lag,
GREATEST (0, EXTRACT(epoch from pg_stat_replication.flush_lag)) as flush_lag,
GREATEST (0, EXTRACT(epoch from pg_stat_replication.replay_lag)) AS replay_lag
FROM pg_stat_replication as pg_stat_replication
LEFT JOIN pg_replication_slots as pg_stat_replication_slot
ON pg_stat_replication.pid = pg_stat_replication_slot.active_pid;
rep.application_name,
rep.state,
rep.sync_state,
rep.client_addr,
slot.slot_name,
slot.slot_type,
GREATEST (0, age(rep.backend_xmin)) as backend_xmin_age,
pg_wal_lsn_diff(
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, sent_lsn),
pg_wal_lsn_diff(
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, write_lsn),
pg_wal_lsn_diff(
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, flush_lsn),
pg_wal_lsn_diff(
CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END, replay_lsn),
GREATEST (0, EXTRACT(epoch from rep.write_lag)) as write_lag,
GREATEST (0, EXTRACT(epoch from rep.flush_lag)) as flush_lag,
GREATEST (0, EXTRACT(epoch from rep.replay_lag)) AS replay_lag
FROM pg_stat_replication as rep
LEFT JOIN pg_replication_slots as slot
ON rep.pid = slot.active_pid;
""".strip(),
'columns': [
{'name': 'wal_app_name', 'type': 'tag'},
Expand All @@ -472,6 +480,10 @@ def trim_leading_set_stmts(sql):
{'name': 'slot_name', 'type': 'tag_not_null'},
{'name': 'slot_type', 'type': 'tag_not_null'},
{'name': 'replication.backend_xmin_age', 'type': 'gauge'},
{'name': 'replication.sent_lsn_delay', 'type': 'gauge'},
{'name': 'replication.write_lsn_delay', 'type': 'gauge'},
{'name': 'replication.flush_lsn_delay', 'type': 'gauge'},
{'name': 'replication.replay_lsn_delay', 'type': 'gauge'},
{'name': 'replication.wal_write_lag', 'type': 'gauge'},
{'name': 'replication.wal_flush_lag', 'type': 'gauge'},
{'name': 'replication.wal_replay_lag', 'type': 'gauge'},
Expand Down
6 changes: 5 additions & 1 deletion postgres/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,13 @@ postgresql.relation.tuples,gauge,,,,"Number of live rows in the table. This is o
postgresql.relation.xmin,gauge,,,,"Transaction ID of the latest relation's modification in pg_class. This metric is tagged with db, schema, table",0,postgres,relation xmin,,
postgresql.relation_size,gauge,,byte,,"The disk space used by the specified table. TOAST data, indexes, free space map and visibility map are not included. This metric is tagged with db, schema, table.",0,postgres,relation size,,
postgresql.replication.backend_xmin_age,gauge,,,,The age of the standby server's xmin horizon (relative to latest stable xid) reported by hot_standby_feedback.,-1,postgres,repl backend xmin,,
postgresql.replication.flush_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location flushed by the standby server,-1,postgres,flush delay,,
postgresql.replication.replay_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location replayed by the standby server,-1,postgres,replay delay,,
postgresql.replication.sent_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location sent by the standby server,-1,postgres,sent delay,,
postgresql.replication.wal_flush_lag,gauge,,second,,Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written and flushed it (but not yet applied it). This can be used to gauge the delay that synchronous_commit level on incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.,-1,postgres,repl flush lag,,
postgresql.replication.wal_replay_lag,gauge,,second,,"Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written, flushed and applied it. This can be used to gauge the delay that synchronous_commit level remote_apply incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.",-1,postgres,repl replay lag,,
postgresql.replication.wal_write_lag,gauge,,second,,Time elapsed between flushing recent WAL locally and receiving notification that this standby server has written it (but not yet flushed it or applied it). This can be used to gauge the delay that synchronous_commit level remote_write incurred while committing if this server was configured as a synchronous standby. Only available with postgresql 10 and newer.,-1,postgres,repl write lag,,
postgresql.replication.write_lsn_delay,gauge,,byte,,The delay in bytes between the current WAL position and the last location written by the standby server,-1,postgres,write delay,,
postgresql.replication_delay,gauge,,second,,The current replication delay in seconds. Only available with postgresql 9.1 and newer,-1,postgres,repl delay,,
postgresql.replication_delay_bytes,gauge,,byte,,The current replication delay in bytes. Only available with postgresql 9.2 and newer,-1,postgres,repl delay bytes,,
postgresql.replication_slot.catalog_xmin_age,gauge,,transaction,,"The age of the oldest transaction affecting the system catalogs that this slot needs the database to retain. VACUUM cannot remove catalog tuples deleted by any later transaction. This metric is tagged with slot_name, slot_type, slot_persistence, slot_state.",-1,postgres,repslot catalog_xmin,,
Expand Down Expand Up @@ -236,4 +240,4 @@ postgresql.wal_receiver.last_msg_receipt_age,gauge,,second,,Time since the recep
postgresql.wal_receiver.last_msg_send_age,gauge,,second,,The age of the latest message's send time received from the WAL sender. This metric is tagged with status.,0,postgres,wal receiver send age,,
postgresql.wal_receiver.latest_end_age,gauge,,second,,Time since the reception of the last message from the WAL sender with an WAL location update. This metric is tagged with status.,0,postgres,wal receiver latest end,,
postgresql.wal_receiver.received_timeline,gauge,,,,"Timeline number of last write-ahead log location received and flushed to disk, the initial value of this field being the timeline number of the first log location used when WAL receiver is started. This metric is tagged with status.",0,postgres,wal receiver tli,,
postgresql.wal_size,gauge,,byte,,The sum of all WAL files on disk.,-1,postgres,wal size,,
postgresql.wal_size,gauge,,byte,,The sum of all WAL files on disk.,-1,postgres,wal size,,
5 changes: 2 additions & 3 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def _increase_txid(cur):
else:
query = 'select txid_current();'
cur.execute(query)
assert cur.fetchone() is not None


def test_initialization_tags(integration_check, pg_instance):
Expand Down Expand Up @@ -485,7 +486,7 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):

check.run()

app = 'test_backend_transaction_age'
app = f'test_backend_transaction_age_{time.time()}'
conn1 = _get_conn(pg_instance, application_name=app)
cur = conn1.cursor()

Expand All @@ -498,8 +499,6 @@ def test_backend_transaction_age(aggregator, integration_check, pg_instance):
cur.execute('BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;')
# Force assignement of a txid and keep the transaction opened
_increase_txid(cur)
# Make sure to fetch the result to make sure we start the timer after the transaction started
cur.fetchall()
start_transaction_time = time.time()

aggregator.reset()
Expand Down
10 changes: 3 additions & 7 deletions postgres/tests/test_pg_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,22 +221,18 @@ def test_conflicts_bufferpin(aggregator, integration_check, pg_instance, pg_repl


@requires_over_10
def test_pg_control_replication(aggregator, integration_check, pg_instance, pg_replica_instance):
check = integration_check(pg_replica_instance)
def test_pg_control_replication(aggregator, integration_check, pg_instance):
check = integration_check(pg_instance)
check.run()

dd_agent_tags = _get_expected_tags(check, pg_replica_instance, role='standby')
dd_agent_tags = _get_expected_tags(check, pg_instance, role='master')
aggregator.assert_metric('postgresql.control.timeline_id', count=1, value=1, tags=dd_agent_tags)

# Also checkpoint on primary to generate changes
master_conn = _get_superconn(pg_instance)
with master_conn.cursor() as cur:
cur.execute("CHECKPOINT;")

postgres_conn = _get_superconn(pg_replica_instance)
with postgres_conn.cursor() as cur:
cur.execute("CHECKPOINT;")

aggregator.reset()
check.run()
# checkpoint should be less than 2s old
Expand Down
Loading