From 0f7b6ed45f1651891d0c1269658ac7884e099d81 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 6 Sep 2025 19:13:14 -0400 Subject: [PATCH 1/4] cmd,collector: reduce connections during /metrics scrapes Signed-off-by: Max Englander --- cmd/postgres_exporter/main.go | 42 ++++++++++++++++++++-------- collector/collector.go | 52 ++++++++++++++++++----------------- collector/instance.go | 14 +++++++++- collector/probe.go | 2 +- 4 files changed, 72 insertions(+), 38 deletions(-) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 3a1c09c46..e9f945e31 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -133,17 +133,37 @@ func main() { dsn = dsns[0] } - pe, err := collector.NewPostgresCollector( - logger, - excludedDatabases, - dsn, - []string{}, - collector.WithTimeout(*scrapeTimeout), - ) - if err != nil { - logger.Warn("Failed to create PostgresCollector", "err", err.Error()) - } else { - prometheus.MustRegister(pe) + if dsn != "" { + // Get the server connection to share with collectors + server, err := exporter.servers.GetServer(dsn) + if err != nil { + logger.Warn("Failed to get server for collectors", "err", err.Error()) + } else { + // Create instance with shared connection from server + instance, err := collector.NewInstance(dsn) + if err != nil { + logger.Warn("Failed to create instance", "err", err.Error()) + } else { + err = instance.SetupWithConnection(server.db) + if err != nil { + logger.Warn("Failed to setup shared instance", "err", err.Error()) + } else { + // Create collector with shared instance (instancePerCollect defaults to false) + pe, err := collector.NewPostgresCollector( + logger, + excludedDatabases, + instance, + []string{}, + collector.WithTimeout(*scrapeTimeout), + ) + if err != nil { + logger.Warn("Failed to create PostgresCollector", "err", err.Error()) + } else { + prometheus.MustRegister(pe) + } + } + } + } } http.Handle(*metricsPath, promhttp.Handler()) diff --git a/collector/collector.go b/collector/collector.go index 408af55a1..4fb0755f6 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -93,7 +93,8 @@ type PostgresCollector struct { logger *slog.Logger scrapeTimeout time.Duration - instance *instance + instance *instance + instancePerCollect bool } type Option func(*PostgresCollector) error @@ -106,10 +107,19 @@ func WithTimeout(timeout time.Duration) Option { } } +// WithInstancePerCollect configures whether to create a new instance per Collect call. +func WithInstancePerCollect() Option { + return func(p *PostgresCollector) error { + p.instancePerCollect = true + return nil + } +} + // NewPostgresCollector creates a new PostgresCollector. -func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) { +func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *instance, filters []string, options ...Option) (*PostgresCollector, error) { p := &PostgresCollector{ - logger: logger, + logger: logger, + instance: instance, } // Apply options to customize the collector for _, o := range options { @@ -154,16 +164,6 @@ func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn st p.Collectors = collectors - if dsn == "" { - return nil, errors.New("empty dsn") - } - - instance, err := newInstance(dsn) - if err != nil { - return nil, err - } - p.instance = instance - return p, nil } @@ -184,15 +184,21 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { ctx = context.Background() } - // copy the instance so that concurrent scrapes have independent instances - inst := p.instance.copy() + var inst *instance - // Set up the database connection for the collector. - err := inst.setup() - defer inst.Close() - if err != nil { - p.logger.Error("Error opening connection to database", "err", err) - return + if p.instancePerCollect { + // copy the instance so that concurrent scrapes have independent instances + inst = p.instance.copy() + // Set up the database connection for the collector. + err := inst.setup() + if err != nil { + p.logger.Error("Error opening connection to database", "err", err) + return + } + defer inst.Close() + } else { + // Use the shared instance directly + inst = p.instance } wg := sync.WaitGroup{} @@ -206,10 +212,6 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Wait() } -func (p *PostgresCollector) Close() error { - return p.instance.Close() -} - func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) { begin := time.Now() err := c.Update(ctx, instance, ch) diff --git a/collector/instance.go b/collector/instance.go index a365697d6..c582f0f3e 100644 --- a/collector/instance.go +++ b/collector/instance.go @@ -27,7 +27,7 @@ type instance struct { version semver.Version } -func newInstance(dsn string) (*instance, error) { +func NewInstance(dsn string) (*instance, error) { i := &instance{ dsn: dsn, } @@ -68,6 +68,18 @@ func (i *instance) setup() error { return nil } +// SetupWithConnection sets up the instance with an existing database connection. +func (i *instance) SetupWithConnection(db *sql.DB) error { + i.db = db + + version, err := queryVersion(i.db) + if err != nil { + return fmt.Errorf("error querying postgresql version: %w", err) + } + i.version = version + return nil +} + func (i *instance) getDB() *sql.DB { return i.db } diff --git a/collector/probe.go b/collector/probe.go index e40d6fee1..6b8cc36df 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -57,7 +57,7 @@ func NewProbeCollector(logger *slog.Logger, excludeDatabases []string, registry } } - instance, err := newInstance(dsn.GetConnectionString()) + instance, err := NewInstance(dsn.GetConnectionString()) if err != nil { return nil, err } From 05e90a9c184a991edc8515912206b432bda4d49f Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 6 Sep 2025 19:55:13 -0400 Subject: [PATCH 2/4] enable new behavior with cli flag Signed-off-by: Max Englander --- cmd/postgres_exporter/main.go | 95 +++++++++++++++-------- collector/collector.go | 10 +-- collector/instance.go | 18 ++--- collector/pg_buffercache_summary.go | 2 +- collector/pg_database.go | 2 +- collector/pg_database_wraparound.go | 2 +- collector/pg_locks.go | 2 +- collector/pg_long_running_transactions.go | 2 +- collector/pg_postmaster.go | 2 +- collector/pg_process_idle.go | 2 +- collector/pg_replication.go | 2 +- collector/pg_replication_slot.go | 2 +- collector/pg_roles.go | 2 +- collector/pg_stat_activity_autovacuum.go | 2 +- collector/pg_stat_bgwriter.go | 2 +- collector/pg_stat_checkpointer.go | 2 +- collector/pg_stat_database.go | 2 +- collector/pg_stat_progress_vacuum.go | 2 +- collector/pg_stat_statements.go | 2 +- collector/pg_stat_user_tables.go | 2 +- collector/pg_stat_walreceiver.go | 2 +- collector/pg_statio_user_indexes.go | 2 +- collector/pg_statio_user_tables.go | 2 +- collector/pg_wal.go | 2 +- collector/pg_xlog_location.go | 2 +- collector/probe.go | 2 +- 26 files changed, 100 insertions(+), 69 deletions(-) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index e9f945e31..8f2e43df2 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -15,9 +15,11 @@ package main import ( "fmt" + "log/slog" "net/http" "os" "strings" + "time" "github.com/alecthomas/kingpin/v2" "github.com/prometheus-community/postgres_exporter/collector" @@ -32,6 +34,65 @@ import ( "github.com/prometheus/exporter-toolkit/web/kingpinflag" ) +func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logger, excludedDatabases []string, scrapeTimeout time.Duration, concurrentScrape bool) { + if dsn == "" { + return + } + + var instance *collector.Instance + var opts []collector.Option + + if concurrentScrape { + // Original behavior: dedicated instance for collector, creates new connection per scrape + inst, err := collector.NewInstance(dsn) + if err != nil { + logger.Warn("Failed to create instance", "err", err.Error()) + return + } + instance = inst + // Add option to create new instance per collect + opts = append(opts, collector.WithInstancePerCollect()) + } else { + // New optimized behavior: share connection from server + server, err := exporter.servers.GetServer(dsn) + if err != nil { + logger.Warn("Failed to get server for collectors", "err", err.Error()) + return + } + + inst, err := collector.NewInstance(dsn) + if err != nil { + logger.Warn("Failed to create instance", "err", err.Error()) + return + } + + err = inst.SetupWithConnection(server.db) + if err != nil { + logger.Warn("Failed to setup shared instance", "err", err.Error()) + return + } + instance = inst + } + + // Add timeout option + opts = append(opts, collector.WithTimeout(scrapeTimeout)) + + // Create collector + pe, err := collector.NewPostgresCollector( + logger, + excludedDatabases, + instance, + []string{}, + opts..., + ) + if err != nil { + logger.Warn("Failed to create PostgresCollector", "err", err.Error()) + return + } + + prometheus.MustRegister(pe) +} + var ( c = config.Handler{ Config: &config.Config{}, @@ -50,6 +111,7 @@ var ( includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String() metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String() scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum time for a scrape to complete before timing out (0 = no timeout)").Default("0").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration() + concurrentScrape = kingpin.Flag("concurrent-scrape", "Use dedicated instance for collector allowing concurrent scrapes (default: true for backward compatibility)").Default("true").Envar("PG_EXPORTER_CONCURRENT_SCRAPE").Bool() logger = promslog.NewNopLogger() ) @@ -133,38 +195,7 @@ func main() { dsn = dsns[0] } - if dsn != "" { - // Get the server connection to share with collectors - server, err := exporter.servers.GetServer(dsn) - if err != nil { - logger.Warn("Failed to get server for collectors", "err", err.Error()) - } else { - // Create instance with shared connection from server - instance, err := collector.NewInstance(dsn) - if err != nil { - logger.Warn("Failed to create instance", "err", err.Error()) - } else { - err = instance.SetupWithConnection(server.db) - if err != nil { - logger.Warn("Failed to setup shared instance", "err", err.Error()) - } else { - // Create collector with shared instance (instancePerCollect defaults to false) - pe, err := collector.NewPostgresCollector( - logger, - excludedDatabases, - instance, - []string{}, - collector.WithTimeout(*scrapeTimeout), - ) - if err != nil { - logger.Warn("Failed to create PostgresCollector", "err", err.Error()) - } else { - prometheus.MustRegister(pe) - } - } - } - } - } + registerPostgresCollector(dsn, exporter, logger, excludedDatabases, *scrapeTimeout, *concurrentScrape) http.Handle(*metricsPath, promhttp.Handler()) diff --git a/collector/collector.go b/collector/collector.go index 4fb0755f6..642880cf2 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -59,7 +59,7 @@ var ( ) type Collector interface { - Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error + Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error } type collectorConfig struct { @@ -93,7 +93,7 @@ type PostgresCollector struct { logger *slog.Logger scrapeTimeout time.Duration - instance *instance + instance *Instance instancePerCollect bool } @@ -116,7 +116,7 @@ func WithInstancePerCollect() Option { } // NewPostgresCollector creates a new PostgresCollector. -func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *instance, filters []string, options ...Option) (*PostgresCollector, error) { +func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *Instance, filters []string, options ...Option) (*PostgresCollector, error) { p := &PostgresCollector{ logger: logger, instance: instance, @@ -184,7 +184,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { ctx = context.Background() } - var inst *instance + var inst *Instance if p.instancePerCollect { // copy the instance so that concurrent scrapes have independent instances @@ -212,7 +212,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Wait() } -func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) { +func execute(ctx context.Context, name string, c Collector, instance *Instance, ch chan<- prometheus.Metric, logger *slog.Logger) { begin := time.Now() err := c.Update(ctx, instance, ch) duration := time.Since(begin) diff --git a/collector/instance.go b/collector/instance.go index c582f0f3e..1314f1433 100644 --- a/collector/instance.go +++ b/collector/instance.go @@ -21,14 +21,14 @@ import ( "github.com/blang/semver/v4" ) -type instance struct { +type Instance struct { dsn string db *sql.DB version semver.Version } -func NewInstance(dsn string) (*instance, error) { - i := &instance{ +func NewInstance(dsn string) (*Instance, error) { + i := &Instance{ dsn: dsn, } @@ -44,13 +44,13 @@ func NewInstance(dsn string) (*instance, error) { } // copy returns a copy of the instance. -func (i *instance) copy() *instance { - return &instance{ +func (i *Instance) copy() *Instance { + return &Instance{ dsn: i.dsn, } } -func (i *instance) setup() error { +func (i *Instance) setup() error { db, err := sql.Open("postgres", i.dsn) if err != nil { return err @@ -69,7 +69,7 @@ func (i *instance) setup() error { } // SetupWithConnection sets up the instance with an existing database connection. -func (i *instance) SetupWithConnection(db *sql.DB) error { +func (i *Instance) SetupWithConnection(db *sql.DB) error { i.db = db version, err := queryVersion(i.db) @@ -80,11 +80,11 @@ func (i *instance) SetupWithConnection(db *sql.DB) error { return nil } -func (i *instance) getDB() *sql.DB { +func (i *Instance) getDB() *sql.DB { return i.db } -func (i *instance) Close() error { +func (i *Instance) Close() error { return i.db.Close() } diff --git a/collector/pg_buffercache_summary.go b/collector/pg_buffercache_summary.go index 8b0e0f007..7f855df0c 100644 --- a/collector/pg_buffercache_summary.go +++ b/collector/pg_buffercache_summary.go @@ -91,7 +91,7 @@ var ( // Update implements Collector // It is called by the Prometheus registry when collecting metrics. -func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { // pg_buffercache_summary is only in v16, and we don't need support for earlier currently. if !instance.version.GE(semver.MustParse("16.0.0")) { return nil diff --git a/collector/pg_database.go b/collector/pg_database.go index 7f98748f4..b84678007 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -76,7 +76,7 @@ var ( // each database individually. This is because we can't filter the // list of databases in the query because the list of excluded // databases is dynamic. -func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGDatabaseCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, diff --git a/collector/pg_database_wraparound.go b/collector/pg_database_wraparound.go index d170821b5..e7fe7cc90 100644 --- a/collector/pg_database_wraparound.go +++ b/collector/pg_database_wraparound.go @@ -61,7 +61,7 @@ var ( ` ) -func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, databaseWraparoundQuery) diff --git a/collector/pg_locks.go b/collector/pg_locks.go index add3e6d42..8cdfa5d0b 100644 --- a/collector/pg_locks.go +++ b/collector/pg_locks.go @@ -88,7 +88,7 @@ var ( // Update implements Collector and exposes database locks. // It is called by the Prometheus registry when collecting metrics. -func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGLocksCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, diff --git a/collector/pg_long_running_transactions.go b/collector/pg_long_running_transactions.go index 072862f4e..eabbb4b83 100644 --- a/collector/pg_long_running_transactions.go +++ b/collector/pg_long_running_transactions.go @@ -61,7 +61,7 @@ AND pid <> pg_backend_pid(); ` ) -func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, longRunningTransactionsQuery) diff --git a/collector/pg_postmaster.go b/collector/pg_postmaster.go index b81e4f905..c9e11fd4a 100644 --- a/collector/pg_postmaster.go +++ b/collector/pg_postmaster.go @@ -47,7 +47,7 @@ var ( pgPostmasterQuery = "SELECT extract(epoch from pg_postmaster_start_time) from pg_postmaster_start_time();" ) -func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGPostmasterCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() row := db.QueryRowContext(ctx, pgPostmasterQuery) diff --git a/collector/pg_process_idle.go b/collector/pg_process_idle.go index 1f9658aa6..742e18135 100644 --- a/collector/pg_process_idle.go +++ b/collector/pg_process_idle.go @@ -44,7 +44,7 @@ var pgProcessIdleSeconds = prometheus.NewDesc( prometheus.Labels{}, ) -func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGProcessIdleCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() row := db.QueryRowContext(ctx, `WITH diff --git a/collector/pg_replication.go b/collector/pg_replication.go index 7f8b2fbd7..e6d61ace2 100644 --- a/collector/pg_replication.go +++ b/collector/pg_replication.go @@ -74,7 +74,7 @@ var ( GREATEST (0, EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))) as last_replay` ) -func (c *PGReplicationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGReplicationCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() row := db.QueryRowContext(ctx, pgReplicationQuery, diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index e6c9773eb..4b20e540b 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -108,7 +108,7 @@ var ( FROM pg_replication_slots;` ) -func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { query := pgReplicationSlotQuery abovePG13 := instance.version.GTE(semver.MustParse("13.0.0")) if abovePG13 { diff --git a/collector/pg_roles.go b/collector/pg_roles.go index 626dbb44f..e08856c93 100644 --- a/collector/pg_roles.go +++ b/collector/pg_roles.go @@ -53,7 +53,7 @@ var ( // Update implements Collector and exposes roles connection limits. // It is called by the Prometheus registry when collecting metrics. -func (c PGRolesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGRolesCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, diff --git a/collector/pg_stat_activity_autovacuum.go b/collector/pg_stat_activity_autovacuum.go index f08029d18..36dcea8ee 100644 --- a/collector/pg_stat_activity_autovacuum.go +++ b/collector/pg_stat_activity_autovacuum.go @@ -53,7 +53,7 @@ var ( ` ) -func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statActivityAutovacuumQuery) diff --git a/collector/pg_stat_bgwriter.go b/collector/pg_stat_bgwriter.go index 6e3bd09cb..27f2891a1 100644 --- a/collector/pg_stat_bgwriter.go +++ b/collector/pg_stat_bgwriter.go @@ -124,7 +124,7 @@ var ( FROM pg_stat_bgwriter;` ) -func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGStatBGWriterCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { if instance.version.GE(semver.MustParse("17.0.0")) { db := instance.getDB() row := db.QueryRowContext(ctx, statBGWriterQueryAfter17) diff --git a/collector/pg_stat_checkpointer.go b/collector/pg_stat_checkpointer.go index 31e9c5d62..e4f2ebdf8 100644 --- a/collector/pg_stat_checkpointer.go +++ b/collector/pg_stat_checkpointer.go @@ -107,7 +107,7 @@ var ( FROM pg_stat_checkpointer;` ) -func (c PGStatCheckpointerCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGStatCheckpointerCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() before17 := instance.version.LT(semver.MustParse("17.0.0")) diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index b9210740f..24be812e9 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -223,7 +223,7 @@ func statDatabaseQuery(columns []string) string { return fmt.Sprintf("SELECT %s FROM pg_stat_database;", strings.Join(columns, ",")) } -func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatDatabaseCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() columns := []string{ diff --git a/collector/pg_stat_progress_vacuum.go b/collector/pg_stat_progress_vacuum.go index f8083a49f..fee18fd3d 100644 --- a/collector/pg_stat_progress_vacuum.go +++ b/collector/pg_stat_progress_vacuum.go @@ -114,7 +114,7 @@ var ( pg_database d ON s.datid = d.oid` ) -func (c *PGStatProgressVacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatProgressVacuumCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statProgressVacuumQuery) diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index d9a29ea65..27760d197 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -173,7 +173,7 @@ const ( LIMIT 100;` ) -func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGStatStatementsCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { var queryTemplate string switch { case instance.version.GE(semver.MustParse("17.0.0")): diff --git a/collector/pg_stat_user_tables.go b/collector/pg_stat_user_tables.go index ad8bcace7..8365de821 100644 --- a/collector/pg_stat_user_tables.go +++ b/collector/pg_stat_user_tables.go @@ -192,7 +192,7 @@ var ( pg_stat_user_tables` ) -func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statUserTablesQuery) diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go index ea0db4558..8890954af 100644 --- a/collector/pg_stat_walreceiver.go +++ b/collector/pg_stat_walreceiver.go @@ -119,7 +119,7 @@ receive_start_tli, ` ) -func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) if err != nil { diff --git a/collector/pg_statio_user_indexes.go b/collector/pg_statio_user_indexes.go index c53f52185..058422187 100644 --- a/collector/pg_statio_user_indexes.go +++ b/collector/pg_statio_user_indexes.go @@ -59,7 +59,7 @@ var ( ` ) -func (c *PGStatioUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c *PGStatioUserIndexesCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statioUserIndexesQuery) diff --git a/collector/pg_statio_user_tables.go b/collector/pg_statio_user_tables.go index 48f6438f2..390ee881c 100644 --- a/collector/pg_statio_user_tables.go +++ b/collector/pg_statio_user_tables.go @@ -100,7 +100,7 @@ var ( FROM pg_statio_user_tables` ) -func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() rows, err := db.QueryContext(ctx, statioUserTablesQuery) diff --git a/collector/pg_wal.go b/collector/pg_wal.go index afa8fcef6..c0d13bdb0 100644 --- a/collector/pg_wal.go +++ b/collector/pg_wal.go @@ -60,7 +60,7 @@ var ( WHERE name ~ '^[0-9A-F]{24}$'` ) -func (c PGWALCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGWALCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() row := db.QueryRowContext(ctx, pgWALQuery, diff --git a/collector/pg_xlog_location.go b/collector/pg_xlog_location.go index 5f091471f..8ff9802c4 100644 --- a/collector/pg_xlog_location.go +++ b/collector/pg_xlog_location.go @@ -51,7 +51,7 @@ var ( ` ) -func (c PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { +func (c PGXlogLocationCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error { db := instance.getDB() // xlog was renmaed to WAL in PostgreSQL 10 diff --git a/collector/probe.go b/collector/probe.go index 6b8cc36df..9f1f79514 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -26,7 +26,7 @@ type ProbeCollector struct { registry *prometheus.Registry collectors map[string]Collector logger *slog.Logger - instance *instance + instance *Instance } func NewProbeCollector(logger *slog.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { From 2a62d4d29694466e8418e5024010da53bc8c6017 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 6 Sep 2025 20:44:38 -0400 Subject: [PATCH 3/4] more fix Signed-off-by: Max Englander --- collector/pg_buffercache_summary_test.go | 2 +- collector/pg_database_test.go | 4 ++-- collector/pg_database_wraparound_test.go | 2 +- collector/pg_locks_test.go | 2 +- collector/pg_long_running_transactions_test.go | 2 +- collector/pg_postmaster_test.go | 4 ++-- collector/pg_replication_slot_test.go | 8 ++++---- collector/pg_replication_test.go | 2 +- collector/pg_roles_test.go | 2 +- collector/pg_stat_activity_autovacuum_test.go | 2 +- collector/pg_stat_bgwriter_test.go | 4 ++-- collector/pg_stat_checkpointer_test.go | 4 ++-- collector/pg_stat_database_test.go | 8 ++++---- collector/pg_stat_progress_vacuum_test.go | 4 ++-- collector/pg_stat_statements_test.go | 16 ++++++++-------- collector/pg_stat_user_tables_test.go | 4 ++-- collector/pg_stat_walreceiver_test.go | 4 ++-- collector/pg_statio_user_indexes_test.go | 4 ++-- collector/pg_statio_user_tables_test.go | 4 ++-- collector/pg_wal_test.go | 2 +- collector/pg_xlog_location_test.go | 2 +- 21 files changed, 43 insertions(+), 43 deletions(-) diff --git a/collector/pg_buffercache_summary_test.go b/collector/pg_buffercache_summary_test.go index 86d91751f..ee3de9c75 100644 --- a/collector/pg_buffercache_summary_test.go +++ b/collector/pg_buffercache_summary_test.go @@ -30,7 +30,7 @@ func TestBuffercacheSummaryCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("16.0.0")} + inst := &Instance{db: db, version: semver.MustParse("16.0.0")} columns := []string{ "buffers_used", diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index fe94166e9..a546c295f 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -29,7 +29,7 @@ func TestPGDatabaseCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}). AddRow("postgres", 15)) @@ -70,7 +70,7 @@ func TestPGDatabaseCollectorNullMetric(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}). AddRow("postgres", nil)) diff --git a/collector/pg_database_wraparound_test.go b/collector/pg_database_wraparound_test.go index d0a74c362..dfc3a49eb 100644 --- a/collector/pg_database_wraparound_test.go +++ b/collector/pg_database_wraparound_test.go @@ -28,7 +28,7 @@ func TestPGDatabaseWraparoundCollector(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", "age_datfrozenxid", diff --git a/collector/pg_locks_test.go b/collector/pg_locks_test.go index 99597ea2d..37a3881b8 100644 --- a/collector/pg_locks_test.go +++ b/collector/pg_locks_test.go @@ -29,7 +29,7 @@ func TestPGLocksCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} rows := sqlmock.NewRows([]string{"datname", "mode", "count"}). AddRow("test", "exclusivelock", 42) diff --git a/collector/pg_long_running_transactions_test.go b/collector/pg_long_running_transactions_test.go index eedda7c65..181073965 100644 --- a/collector/pg_long_running_transactions_test.go +++ b/collector/pg_long_running_transactions_test.go @@ -28,7 +28,7 @@ func TestPGLongRunningTransactionsCollector(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "transactions", "age_in_seconds", diff --git a/collector/pg_postmaster_test.go b/collector/pg_postmaster_test.go index 8405b4225..a125e04a3 100644 --- a/collector/pg_postmaster_test.go +++ b/collector/pg_postmaster_test.go @@ -29,7 +29,7 @@ func TestPgPostmasterCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). AddRow(1685739904)) @@ -65,7 +65,7 @@ func TestPgPostmasterCollectorNullTime(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). AddRow(nil)) diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 981b5db62..349d383c2 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -30,7 +30,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). @@ -73,7 +73,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). @@ -116,7 +116,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). @@ -157,7 +157,7 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). diff --git a/collector/pg_replication_test.go b/collector/pg_replication_test.go index a48e9fd69..f53fa7496 100644 --- a/collector/pg_replication_test.go +++ b/collector/pg_replication_test.go @@ -29,7 +29,7 @@ func TestPgReplicationCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{"lag", "is_replica", "last_replay"} rows := sqlmock.NewRows(columns). diff --git a/collector/pg_roles_test.go b/collector/pg_roles_test.go index 182a120f9..5de94ddd3 100644 --- a/collector/pg_roles_test.go +++ b/collector/pg_roles_test.go @@ -29,7 +29,7 @@ func TestPGRolesCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} mock.ExpectQuery(sanitizeQuery(pgRolesConnectionLimitsQuery)).WillReturnRows(sqlmock.NewRows([]string{"rolname", "rolconnlimit"}). AddRow("postgres", 15)) diff --git a/collector/pg_stat_activity_autovacuum_test.go b/collector/pg_stat_activity_autovacuum_test.go index a6fcdbcad..14c0204f2 100644 --- a/collector/pg_stat_activity_autovacuum_test.go +++ b/collector/pg_stat_activity_autovacuum_test.go @@ -28,7 +28,7 @@ func TestPGStatActivityAutovacuumCollector(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "relname", "timestamp_seconds", diff --git a/collector/pg_stat_bgwriter_test.go b/collector/pg_stat_bgwriter_test.go index 6fde2fb6a..05189e49d 100644 --- a/collector/pg_stat_bgwriter_test.go +++ b/collector/pg_stat_bgwriter_test.go @@ -30,7 +30,7 @@ func TestPGStatBGWriterCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "checkpoints_timed", @@ -96,7 +96,7 @@ func TestPGStatBGWriterCollectorNullValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "checkpoints_timed", diff --git a/collector/pg_stat_checkpointer_test.go b/collector/pg_stat_checkpointer_test.go index 9a8dd7f21..f5b451fab 100644 --- a/collector/pg_stat_checkpointer_test.go +++ b/collector/pg_stat_checkpointer_test.go @@ -31,7 +31,7 @@ func TestPGStatCheckpointerCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("17.0.0")} + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} columns := []string{ "num_timed", @@ -93,7 +93,7 @@ func TestPGStatCheckpointerCollectorNullValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("17.0.0")} + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} columns := []string{ "num_timed", diff --git a/collector/pg_stat_database_test.go b/collector/pg_stat_database_test.go index e6194ca2e..7475c69e5 100644 --- a/collector/pg_stat_database_test.go +++ b/collector/pg_stat_database_test.go @@ -32,7 +32,7 @@ func TestPGStatDatabaseCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("14.0.0")} + inst := &Instance{db: db, version: semver.MustParse("14.0.0")} columns := []string{ "datid", @@ -143,7 +143,7 @@ func TestPGStatDatabaseCollectorNullValues(t *testing.T) { if err != nil { t.Fatalf("Error parsing time: %s", err) } - inst := &instance{db: db, version: semver.MustParse("14.0.0")} + inst := &Instance{db: db, version: semver.MustParse("14.0.0")} columns := []string{ "datid", @@ -265,7 +265,7 @@ func TestPGStatDatabaseCollectorRowLeakTest(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("14.0.0")} + inst := &Instance{db: db, version: semver.MustParse("14.0.0")} columns := []string{ "datid", @@ -434,7 +434,7 @@ func TestPGStatDatabaseCollectorTestNilStatReset(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("14.0.0")} + inst := &Instance{db: db, version: semver.MustParse("14.0.0")} columns := []string{ "datid", diff --git a/collector/pg_stat_progress_vacuum_test.go b/collector/pg_stat_progress_vacuum_test.go index 80572feb8..9980d9b42 100644 --- a/collector/pg_stat_progress_vacuum_test.go +++ b/collector/pg_stat_progress_vacuum_test.go @@ -29,7 +29,7 @@ func TestPGStatProgressVacuumCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", "relname", "phase", "heap_blks_total", "heap_blks_scanned", @@ -85,7 +85,7 @@ func TestPGStatProgressVacuumCollectorNullValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", "relname", "phase", "heap_blks_total", "heap_blks_scanned", diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index 0497ba380..75ed255ae 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -31,7 +31,7 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("12.0.0")} + inst := &Instance{db: db, version: semver.MustParse("12.0.0")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -74,7 +74,7 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("12.0.0")} + inst := &Instance{db: db, version: semver.MustParse("12.0.0")} columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -118,7 +118,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -161,7 +161,7 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -205,7 +205,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -248,7 +248,7 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("13.3.7")} + inst := &Instance{db: db, version: semver.MustParse("13.3.7")} columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -292,7 +292,7 @@ func TestPGStateStatementsCollector_PG17(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("17.0.0")} + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). @@ -335,7 +335,7 @@ func TestPGStateStatementsCollector_PG17_WithStatement(t *testing.T) { } defer db.Close() - inst := &instance{db: db, version: semver.MustParse("17.0.0")} + inst := &Instance{db: db, version: semver.MustParse("17.0.0")} columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). diff --git a/collector/pg_stat_user_tables_test.go b/collector/pg_stat_user_tables_test.go index 4649bdbc5..6c2acb4d7 100644 --- a/collector/pg_stat_user_tables_test.go +++ b/collector/pg_stat_user_tables_test.go @@ -30,7 +30,7 @@ func TestPGStatUserTablesCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z") if err != nil { @@ -152,7 +152,7 @@ func TestPGStatUserTablesCollectorNullValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go index c81c9ecae..94d9b9064 100644 --- a/collector/pg_stat_walreceiver_test.go +++ b/collector/pg_stat_walreceiver_test.go @@ -33,7 +33,7 @@ func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} infoSchemaColumns := []string{ "column_name", } @@ -116,7 +116,7 @@ func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} infoSchemaColumns := []string{ "column_name", } diff --git a/collector/pg_statio_user_indexes_test.go b/collector/pg_statio_user_indexes_test.go index 174012162..93cb14538 100644 --- a/collector/pg_statio_user_indexes_test.go +++ b/collector/pg_statio_user_indexes_test.go @@ -28,7 +28,7 @@ func TestPgStatioUserIndexesCollector(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "schemaname", "relname", @@ -71,7 +71,7 @@ func TestPgStatioUserIndexesCollectorNull(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "schemaname", "relname", diff --git a/collector/pg_statio_user_tables_test.go b/collector/pg_statio_user_tables_test.go index c7304a38c..3ca1b3f59 100644 --- a/collector/pg_statio_user_tables_test.go +++ b/collector/pg_statio_user_tables_test.go @@ -29,7 +29,7 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", @@ -96,7 +96,7 @@ func TestPGStatIOUserTablesCollectorNullValues(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "datname", diff --git a/collector/pg_wal_test.go b/collector/pg_wal_test.go index 745105a13..d99c7390b 100644 --- a/collector/pg_wal_test.go +++ b/collector/pg_wal_test.go @@ -29,7 +29,7 @@ func TestPgWALCollector(t *testing.T) { } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{"segments", "size"} rows := sqlmock.NewRows(columns). diff --git a/collector/pg_xlog_location_test.go b/collector/pg_xlog_location_test.go index 561a7df94..7818db7ce 100644 --- a/collector/pg_xlog_location_test.go +++ b/collector/pg_xlog_location_test.go @@ -28,7 +28,7 @@ func TestPGXlogLocationCollector(t *testing.T) { t.Fatalf("Error opening a stub db connection: %s", err) } defer db.Close() - inst := &instance{db: db} + inst := &Instance{db: db} columns := []string{ "bytes", } From 4ff7049ae2f38f1a6592227c48b3d4df7ed47f90 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Sat, 6 Sep 2025 22:41:29 -0400 Subject: [PATCH 4/4] connection resilience Signed-off-by: Max Englander --- cmd/postgres_exporter/main.go | 56 ++++++++++++++++------------------- collector/collector.go | 45 ++++++++-------------------- collector/instance.go | 24 ++++++++++++++- 3 files changed, 61 insertions(+), 64 deletions(-) diff --git a/cmd/postgres_exporter/main.go b/cmd/postgres_exporter/main.go index 8f2e43df2..2bd58d9d7 100644 --- a/cmd/postgres_exporter/main.go +++ b/cmd/postgres_exporter/main.go @@ -39,51 +39,45 @@ func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logg return } - var instance *collector.Instance - var opts []collector.Option + var factory collector.InstanceFactory if concurrentScrape { // Original behavior: dedicated instance for collector, creates new connection per scrape - inst, err := collector.NewInstance(dsn) + template, err := collector.NewInstance(dsn) if err != nil { - logger.Warn("Failed to create instance", "err", err.Error()) + logger.Warn("Failed to create template instance", "err", err.Error()) return } - instance = inst - // Add option to create new instance per collect - opts = append(opts, collector.WithInstancePerCollect()) + factory = collector.InstanceFactoryFromTemplate(template) } else { - // New optimized behavior: share connection from server - server, err := exporter.servers.GetServer(dsn) - if err != nil { - logger.Warn("Failed to get server for collectors", "err", err.Error()) - return - } - - inst, err := collector.NewInstance(dsn) - if err != nil { - logger.Warn("Failed to create instance", "err", err.Error()) - return + // New optimized behavior: share connection from server with resilience + factory = func() (*collector.Instance, error) { + server, err := exporter.servers.GetServer(dsn) + if err != nil { + return nil, err + } + + inst, err := collector.NewInstance(dsn) + if err != nil { + return nil, err + } + + err = inst.SetupWithConnection(server.db) + if err != nil { + return nil, err + } + + return inst, nil } - - err = inst.SetupWithConnection(server.db) - if err != nil { - logger.Warn("Failed to setup shared instance", "err", err.Error()) - return - } - instance = inst } - // Add timeout option - opts = append(opts, collector.WithTimeout(scrapeTimeout)) - - // Create collector + // Create collector with factory pe, err := collector.NewPostgresCollector( logger, excludedDatabases, - instance, + factory, []string{}, - opts..., + collector.WithTimeout(scrapeTimeout), ) if err != nil { logger.Warn("Failed to create PostgresCollector", "err", err.Error()) diff --git a/collector/collector.go b/collector/collector.go index 642880cf2..57c101202 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -89,12 +89,10 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle // PostgresCollector implements the prometheus.Collector interface. type PostgresCollector struct { - Collectors map[string]Collector - logger *slog.Logger - scrapeTimeout time.Duration - - instance *Instance - instancePerCollect bool + Collectors map[string]Collector + logger *slog.Logger + scrapeTimeout time.Duration + instanceFactory InstanceFactory } type Option func(*PostgresCollector) error @@ -107,19 +105,11 @@ func WithTimeout(timeout time.Duration) Option { } } -// WithInstancePerCollect configures whether to create a new instance per Collect call. -func WithInstancePerCollect() Option { - return func(p *PostgresCollector) error { - p.instancePerCollect = true - return nil - } -} - // NewPostgresCollector creates a new PostgresCollector. -func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *Instance, filters []string, options ...Option) (*PostgresCollector, error) { +func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, factory InstanceFactory, filters []string, options ...Option) (*PostgresCollector, error) { p := &PostgresCollector{ - logger: logger, - instance: instance, + logger: logger, + instanceFactory: factory, } // Apply options to customize the collector for _, o := range options { @@ -184,22 +174,13 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { ctx = context.Background() } - var inst *Instance - - if p.instancePerCollect { - // copy the instance so that concurrent scrapes have independent instances - inst = p.instance.copy() - // Set up the database connection for the collector. - err := inst.setup() - if err != nil { - p.logger.Error("Error opening connection to database", "err", err) - return - } - defer inst.Close() - } else { - // Use the shared instance directly - inst = p.instance + // Use the factory to get an instance + inst, err := p.instanceFactory() + if err != nil { + p.logger.Error("Error creating instance", "err", err) + return } + defer inst.Close() // Always safe - closeDB flag determines if connection is actually closed wg := sync.WaitGroup{} wg.Add(len(p.Collectors)) diff --git a/collector/instance.go b/collector/instance.go index 1314f1433..b4f965461 100644 --- a/collector/instance.go +++ b/collector/instance.go @@ -25,6 +25,7 @@ type Instance struct { dsn string db *sql.DB version semver.Version + closeDB bool // whether we should close the connection on Close() } func NewInstance(dsn string) (*Instance, error) { @@ -58,6 +59,7 @@ func (i *Instance) setup() error { db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) i.db = db + i.closeDB = true // we created this connection, so we should close it version, err := queryVersion(i.db) if err != nil { @@ -71,6 +73,7 @@ func (i *Instance) setup() error { // SetupWithConnection sets up the instance with an existing database connection. func (i *Instance) SetupWithConnection(db *sql.DB) error { i.db = db + i.closeDB = false // we're borrowing this connection, don't close it version, err := queryVersion(i.db) if err != nil { @@ -85,7 +88,10 @@ func (i *Instance) getDB() *sql.DB { } func (i *Instance) Close() error { - return i.db.Close() + if i.closeDB { + return i.db.Close() + } + return nil } // Regex used to get the "short-version" from the postgres version field. @@ -116,3 +122,19 @@ func queryVersion(db *sql.DB) (semver.Version, error) { } return semver.Version{}, fmt.Errorf("could not parse version from %q", version) } + +// InstanceFactory creates instances for collectors to use +type InstanceFactory func() (*Instance, error) + +// InstanceFactoryFromTemplate creates a factory that copies from a template instance +// and creates a new database connection for each call +func InstanceFactoryFromTemplate(template *Instance) InstanceFactory { + return func() (*Instance, error) { + inst := template.copy() + err := inst.setup() // Creates new connection, sets closeDB=true + if err != nil { + return nil, err + } + return inst, nil + } +}