Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions cmd/postgres_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package main

import (
"fmt"
"log/slog"
"net/http"
"os"
"strings"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus-community/postgres_exporter/collector"
Expand All @@ -32,6 +34,59 @@ import (
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
)

func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logger, excludedDatabases []string, scrapeTimeout time.Duration, concurrentScrape bool) {
if dsn == "" {
return
}

var factory collector.InstanceFactory

if concurrentScrape {
// Original behavior: dedicated instance for collector, creates new connection per scrape
template, err := collector.NewInstance(dsn)
if err != nil {
logger.Warn("Failed to create template instance", "err", err.Error())
return
}
factory = collector.InstanceFactoryFromTemplate(template)
} else {
// New optimized behavior: share connection from server with resilience
factory = func() (*collector.Instance, error) {
server, err := exporter.servers.GetServer(dsn)
if err != nil {
return nil, err
}

inst, err := collector.NewInstance(dsn)
if err != nil {
return nil, err
}

err = inst.SetupWithConnection(server.db)
if err != nil {
return nil, err
}

return inst, nil
}
}

// Create collector with factory
pe, err := collector.NewPostgresCollector(
logger,
excludedDatabases,
factory,
[]string{},
collector.WithTimeout(scrapeTimeout),
)
if err != nil {
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
return
}

prometheus.MustRegister(pe)
}

var (
c = config.Handler{
Config: &config.Config{},
Expand All @@ -50,6 +105,7 @@ var (
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum time for a scrape to complete before timing out (0 = no timeout)").Default("0").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
concurrentScrape = kingpin.Flag("concurrent-scrape", "Use dedicated instance for collector allowing concurrent scrapes (default: true for backward compatibility)").Default("true").Envar("PG_EXPORTER_CONCURRENT_SCRAPE").Bool()
logger = promslog.NewNopLogger()
)

Expand Down Expand Up @@ -133,18 +189,7 @@ func main() {
dsn = dsns[0]
}

pe, err := collector.NewPostgresCollector(
logger,
excludedDatabases,
dsn,
[]string{},
collector.WithTimeout(*scrapeTimeout),
)
if err != nil {
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
} else {
prometheus.MustRegister(pe)
}
registerPostgresCollector(dsn, exporter, logger, excludedDatabases, *scrapeTimeout, *concurrentScrape)

http.Handle(*metricsPath, promhttp.Handler())

Expand Down
43 changes: 13 additions & 30 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
)

type Collector interface {
Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error
}

type collectorConfig struct {
Expand Down Expand Up @@ -89,11 +89,10 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle

// PostgresCollector implements the prometheus.Collector interface.
type PostgresCollector struct {
Collectors map[string]Collector
logger *slog.Logger
scrapeTimeout time.Duration

instance *instance
Collectors map[string]Collector
logger *slog.Logger
scrapeTimeout time.Duration
instanceFactory InstanceFactory
}

type Option func(*PostgresCollector) error
Expand All @@ -107,9 +106,10 @@ func WithTimeout(timeout time.Duration) Option {
}

// NewPostgresCollector creates a new PostgresCollector.
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn string, filters []string, options ...Option) (*PostgresCollector, error) {
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, factory InstanceFactory, filters []string, options ...Option) (*PostgresCollector, error) {
p := &PostgresCollector{
logger: logger,
logger: logger,
instanceFactory: factory,
}
// Apply options to customize the collector
for _, o := range options {
Expand Down Expand Up @@ -154,16 +154,6 @@ func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, dsn st

p.Collectors = collectors

if dsn == "" {
return nil, errors.New("empty dsn")
}

instance, err := newInstance(dsn)
if err != nil {
return nil, err
}
p.instance = instance

return p, nil
}

Expand All @@ -184,16 +174,13 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
ctx = context.Background()
}

// copy the instance so that concurrent scrapes have independent instances
inst := p.instance.copy()

// Set up the database connection for the collector.
err := inst.setup()
defer inst.Close()
// Use the factory to get an instance
inst, err := p.instanceFactory()
if err != nil {
p.logger.Error("Error opening connection to database", "err", err)
p.logger.Error("Error creating instance", "err", err)
return
}
defer inst.Close() // Always safe - closeDB flag determines if connection is actually closed

wg := sync.WaitGroup{}
wg.Add(len(p.Collectors))
Expand All @@ -206,11 +193,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
wg.Wait()
}

func (p *PostgresCollector) Close() error {
return p.instance.Close()
}

func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
func execute(ctx context.Context, name string, c Collector, instance *Instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
begin := time.Now()
err := c.Update(ctx, instance, ch)
duration := time.Since(begin)
Expand Down
52 changes: 43 additions & 9 deletions collector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/blang/semver/v4"
)

type instance struct {
type Instance struct {
dsn string
db *sql.DB
version semver.Version
closeDB bool // whether we should close the connection on Close()
}

func newInstance(dsn string) (*instance, error) {
i := &instance{
func NewInstance(dsn string) (*Instance, error) {
i := &Instance{
dsn: dsn,
}

Expand All @@ -44,20 +45,21 @@ func newInstance(dsn string) (*instance, error) {
}

// copy returns a copy of the instance.
func (i *instance) copy() *instance {
return &instance{
func (i *Instance) copy() *Instance {
return &Instance{
dsn: i.dsn,
}
}

func (i *instance) setup() error {
func (i *Instance) setup() error {
db, err := sql.Open("postgres", i.dsn)
if err != nil {
return err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
i.db = db
i.closeDB = true // we created this connection, so we should close it

version, err := queryVersion(i.db)
if err != nil {
Expand All @@ -68,12 +70,28 @@ func (i *instance) setup() error {
return nil
}

func (i *instance) getDB() *sql.DB {
// SetupWithConnection sets up the instance with an existing database connection.
func (i *Instance) SetupWithConnection(db *sql.DB) error {
i.db = db
i.closeDB = false // we're borrowing this connection, don't close it

version, err := queryVersion(i.db)
if err != nil {
return fmt.Errorf("error querying postgresql version: %w", err)
}
i.version = version
return nil
}

func (i *Instance) getDB() *sql.DB {
return i.db
}

func (i *instance) Close() error {
return i.db.Close()
func (i *Instance) Close() error {
if i.closeDB {
return i.db.Close()
}
return nil
}

// Regex used to get the "short-version" from the postgres version field.
Expand Down Expand Up @@ -104,3 +122,19 @@ func queryVersion(db *sql.DB) (semver.Version, error) {
}
return semver.Version{}, fmt.Errorf("could not parse version from %q", version)
}

// InstanceFactory creates instances for collectors to use
type InstanceFactory func() (*Instance, error)

// InstanceFactoryFromTemplate creates a factory that copies from a template instance
// and creates a new database connection for each call
func InstanceFactoryFromTemplate(template *Instance) InstanceFactory {
return func() (*Instance, error) {
inst := template.copy()
err := inst.setup() // Creates new connection, sets closeDB=true
if err != nil {
return nil, err
}
return inst, nil
}
}
2 changes: 1 addition & 1 deletion collector/pg_buffercache_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (

// Update implements Collector
// It is called by the Prometheus registry when collecting metrics.
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
// pg_buffercache_summary is only in v16, and we don't need support for earlier currently.
if !instance.version.GE(semver.MustParse("16.0.0")) {
return nil
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_buffercache_summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBuffercacheSummaryCollector(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db, version: semver.MustParse("16.0.0")}
inst := &Instance{db: db, version: semver.MustParse("16.0.0")}

columns := []string{
"buffers_used",
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (
// each database individually. This is because we can't filter the
// list of databases in the query because the list of excluded
// databases is dynamic.
func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (c PGDatabaseCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
// Query the list of databases
rows, err := db.QueryContext(ctx,
Expand Down
4 changes: 2 additions & 2 deletions collector/pg_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestPGDatabaseCollector(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &Instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}).
AddRow("postgres", 15))
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestPGDatabaseCollectorNullMetric(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &Instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname", "datconnlimit"}).
AddRow("postgres", nil))
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_database_wraparound.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
`
)

func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
databaseWraparoundQuery)
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_database_wraparound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestPGDatabaseWraparoundCollector(t *testing.T) {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
inst := &Instance{db: db}
columns := []string{
"datname",
"age_datfrozenxid",
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (

// Update implements Collector and exposes database locks.
// It is called by the Prometheus registry when collecting metrics.
func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (c PGLocksCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
// Query the list of databases
rows, err := db.QueryContext(ctx,
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_locks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestPGLocksCollector(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &Instance{db: db}

rows := sqlmock.NewRows([]string{"datname", "mode", "count"}).
AddRow("test", "exclusivelock", 42)
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_long_running_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ AND pid <> pg_backend_pid();
`
)

func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx,
longRunningTransactionsQuery)
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_long_running_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestPGLongRunningTransactionsCollector(t *testing.T) {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}
inst := &Instance{db: db}
columns := []string{
"transactions",
"age_in_seconds",
Expand Down
2 changes: 1 addition & 1 deletion collector/pg_postmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
pgPostmasterQuery = "SELECT extract(epoch from pg_postmaster_start_time) from pg_postmaster_start_time();"
)

func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
func (c *PGPostmasterCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx,
pgPostmasterQuery)
Expand Down
Loading