Skip to content

Commit 4ff7049

Browse files
committed
connection resilience
Signed-off-by: Max Englander <[email protected]>
1 parent 2a62d4d commit 4ff7049

File tree

3 files changed

+61
-64
lines changed

3 files changed

+61
-64
lines changed

cmd/postgres_exporter/main.go

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,51 +39,45 @@ func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logg
3939
return
4040
}
4141

42-
var instance *collector.Instance
43-
var opts []collector.Option
42+
var factory collector.InstanceFactory
4443

4544
if concurrentScrape {
4645
// Original behavior: dedicated instance for collector, creates new connection per scrape
47-
inst, err := collector.NewInstance(dsn)
46+
template, err := collector.NewInstance(dsn)
4847
if err != nil {
49-
logger.Warn("Failed to create instance", "err", err.Error())
48+
logger.Warn("Failed to create template instance", "err", err.Error())
5049
return
5150
}
52-
instance = inst
53-
// Add option to create new instance per collect
54-
opts = append(opts, collector.WithInstancePerCollect())
51+
factory = collector.InstanceFactoryFromTemplate(template)
5552
} else {
56-
// New optimized behavior: share connection from server
57-
server, err := exporter.servers.GetServer(dsn)
58-
if err != nil {
59-
logger.Warn("Failed to get server for collectors", "err", err.Error())
60-
return
61-
}
62-
63-
inst, err := collector.NewInstance(dsn)
64-
if err != nil {
65-
logger.Warn("Failed to create instance", "err", err.Error())
66-
return
53+
// New optimized behavior: share connection from server with resilience
54+
factory = func() (*collector.Instance, error) {
55+
server, err := exporter.servers.GetServer(dsn)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
inst, err := collector.NewInstance(dsn)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
err = inst.SetupWithConnection(server.db)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
return inst, nil
6771
}
68-
69-
err = inst.SetupWithConnection(server.db)
70-
if err != nil {
71-
logger.Warn("Failed to setup shared instance", "err", err.Error())
72-
return
73-
}
74-
instance = inst
7572
}
7673

77-
// Add timeout option
78-
opts = append(opts, collector.WithTimeout(scrapeTimeout))
79-
80-
// Create collector
74+
// Create collector with factory
8175
pe, err := collector.NewPostgresCollector(
8276
logger,
8377
excludedDatabases,
84-
instance,
78+
factory,
8579
[]string{},
86-
opts...,
80+
collector.WithTimeout(scrapeTimeout),
8781
)
8882
if err != nil {
8983
logger.Warn("Failed to create PostgresCollector", "err", err.Error())

collector/collector.go

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,10 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle
8989

9090
// PostgresCollector implements the prometheus.Collector interface.
9191
type PostgresCollector struct {
92-
Collectors map[string]Collector
93-
logger *slog.Logger
94-
scrapeTimeout time.Duration
95-
96-
instance *Instance
97-
instancePerCollect bool
92+
Collectors map[string]Collector
93+
logger *slog.Logger
94+
scrapeTimeout time.Duration
95+
instanceFactory InstanceFactory
9896
}
9997

10098
type Option func(*PostgresCollector) error
@@ -107,19 +105,11 @@ func WithTimeout(timeout time.Duration) Option {
107105
}
108106
}
109107

110-
// WithInstancePerCollect configures whether to create a new instance per Collect call.
111-
func WithInstancePerCollect() Option {
112-
return func(p *PostgresCollector) error {
113-
p.instancePerCollect = true
114-
return nil
115-
}
116-
}
117-
118108
// NewPostgresCollector creates a new PostgresCollector.
119-
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *Instance, filters []string, options ...Option) (*PostgresCollector, error) {
109+
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, factory InstanceFactory, filters []string, options ...Option) (*PostgresCollector, error) {
120110
p := &PostgresCollector{
121-
logger: logger,
122-
instance: instance,
111+
logger: logger,
112+
instanceFactory: factory,
123113
}
124114
// Apply options to customize the collector
125115
for _, o := range options {
@@ -184,22 +174,13 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
184174
ctx = context.Background()
185175
}
186176

187-
var inst *Instance
188-
189-
if p.instancePerCollect {
190-
// copy the instance so that concurrent scrapes have independent instances
191-
inst = p.instance.copy()
192-
// Set up the database connection for the collector.
193-
err := inst.setup()
194-
if err != nil {
195-
p.logger.Error("Error opening connection to database", "err", err)
196-
return
197-
}
198-
defer inst.Close()
199-
} else {
200-
// Use the shared instance directly
201-
inst = p.instance
177+
// Use the factory to get an instance
178+
inst, err := p.instanceFactory()
179+
if err != nil {
180+
p.logger.Error("Error creating instance", "err", err)
181+
return
202182
}
183+
defer inst.Close() // Always safe - closeDB flag determines if connection is actually closed
203184

204185
wg := sync.WaitGroup{}
205186
wg.Add(len(p.Collectors))

collector/instance.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Instance struct {
2525
dsn string
2626
db *sql.DB
2727
version semver.Version
28+
closeDB bool // whether we should close the connection on Close()
2829
}
2930

3031
func NewInstance(dsn string) (*Instance, error) {
@@ -58,6 +59,7 @@ func (i *Instance) setup() error {
5859
db.SetMaxOpenConns(1)
5960
db.SetMaxIdleConns(1)
6061
i.db = db
62+
i.closeDB = true // we created this connection, so we should close it
6163

6264
version, err := queryVersion(i.db)
6365
if err != nil {
@@ -71,6 +73,7 @@ func (i *Instance) setup() error {
7173
// SetupWithConnection sets up the instance with an existing database connection.
7274
func (i *Instance) SetupWithConnection(db *sql.DB) error {
7375
i.db = db
76+
i.closeDB = false // we're borrowing this connection, don't close it
7477

7578
version, err := queryVersion(i.db)
7679
if err != nil {
@@ -85,7 +88,10 @@ func (i *Instance) getDB() *sql.DB {
8588
}
8689

8790
func (i *Instance) Close() error {
88-
return i.db.Close()
91+
if i.closeDB {
92+
return i.db.Close()
93+
}
94+
return nil
8995
}
9096

9197
// Regex used to get the "short-version" from the postgres version field.
@@ -116,3 +122,19 @@ func queryVersion(db *sql.DB) (semver.Version, error) {
116122
}
117123
return semver.Version{}, fmt.Errorf("could not parse version from %q", version)
118124
}
125+
126+
// InstanceFactory creates instances for collectors to use
127+
type InstanceFactory func() (*Instance, error)
128+
129+
// InstanceFactoryFromTemplate creates a factory that copies from a template instance
130+
// and creates a new database connection for each call
131+
func InstanceFactoryFromTemplate(template *Instance) InstanceFactory {
132+
return func() (*Instance, error) {
133+
inst := template.copy()
134+
err := inst.setup() // Creates new connection, sets closeDB=true
135+
if err != nil {
136+
return nil, err
137+
}
138+
return inst, nil
139+
}
140+
}

0 commit comments

Comments
 (0)