Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion receiver/prometheusreceiver/internal/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

// TargetSamplesReporter defines the interface for reporting target samples
type TargetSamplesReporter interface {
ReportTargetSamples(job, instance string, sampleCount int)
}

// appendable translates Prometheus scraping diffs into OpenTelemetry format.
type appendable struct {
sink consumer.Metrics
Expand All @@ -24,6 +29,7 @@ type appendable struct {
trimSuffixes bool
startTimeMetricRegex *regexp.Regexp
externalLabels labels.Labels
targetSamplesReporter TargetSamplesReporter

settings receiver.Settings
obsrecv *receiverhelper.ObsReport
Expand All @@ -40,6 +46,7 @@ func NewAppendable(
enableNativeHistograms bool,
externalLabels labels.Labels,
trimSuffixes bool,
targetSamplesReporter TargetSamplesReporter,
) (storage.Appendable, error) {
var metricAdjuster MetricsAdjuster
if !useStartTimeMetric {
Expand All @@ -63,9 +70,10 @@ func NewAppendable(
externalLabels: externalLabels,
obsrecv: obsrecv,
trimSuffixes: trimSuffixes,
targetSamplesReporter: targetSamplesReporter,
}, nil
}

func (o *appendable) Appender(ctx context.Context) storage.Appender {
return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms)
return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms, o.targetSamplesReporter)
}
58 changes: 58 additions & 0 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type transaction struct {
buildInfo component.BuildInfo
metricAdjuster MetricsAdjuster
obsrecv *receiverhelper.ObsReport
targetSamplesReporter TargetSamplesReporter
// originalJobName stores the original job name from scrape target, before any relabeling
originalJobName string
// originalAddress stores the original address from scrape target, before any relabeling
originalAddress string
// Used as buffer to calculate series ref hash.
bufBytes []byte
}
Expand All @@ -88,6 +93,7 @@ func newTransaction(
obsrecv *receiverhelper.ObsReport,
trimSuffixes bool,
enableNativeHistograms bool,
targetSamplesReporter TargetSamplesReporter,
) *transaction {
return &transaction{
ctx: ctx,
Expand All @@ -101,6 +107,7 @@ func newTransaction(
logger: settings.Logger,
buildInfo: settings.BuildInfo,
obsrecv: obsrecv,
targetSamplesReporter: targetSamplesReporter,
bufBytes: make([]byte, 0, 1024),
scopeAttributes: make(map[resourceKey]map[scopeID]pcommon.Map),
nodeResources: map[resourceKey]pcommon.Resource{},
Expand Down Expand Up @@ -493,6 +500,31 @@ func (t *transaction) initTransaction(lbs labels.Labels) (*resourceKey, error) {
return nil, errors.New("unable to find MetricMetadataStore in context")
}

// Get the original job name from the target labels (before metric relabeling)
// This ensures we report the correct job name to the target allocator
if t.originalJobName == "" {
// Try to get job name from target labels first
// t.originalJobName = target.GetValue(model.JobLabel)

// Debug: log target info to understand what's available
builder := labels.NewBuilder(labels.EmptyLabels())
targetLabels := target.Labels(builder)
discoveredLabels := target.DiscoveredLabels(labels.NewBuilder(labels.EmptyLabels()))

t.logger.Info("initTransaction: target info",
zap.String("job_from_target_labels", t.originalJobName),
zap.String("instance_from_target", target.GetValue(model.InstanceLabel)),
zap.String("target_labels", targetLabels.String()),
zap.String("discovered_labels", discoveredLabels.String()))

t.originalJobName = discoveredLabels.Get(model.JobLabel)
t.originalAddress = discoveredLabels.Get(model.AddressLabel)
t.logger.Info("initTransaction:",
zap.String("original_job_name", t.originalJobName),
zap.String("original_address", t.originalAddress))

}

rKey, err := t.getJobAndInstance(lbs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -563,6 +595,12 @@ func (t *transaction) Commit() error {

err = t.sink.ConsumeMetrics(ctx, md)
t.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)

// Report target samples to target allocator if configured and successful
if err == nil && t.targetSamplesReporter != nil {
t.reportTargetSamples(numPoints)
}

return err
}

Expand Down Expand Up @@ -619,3 +657,23 @@ func (t *transaction) addScopeInfo(key resourceKey, ls labels.Labels) {
func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
return ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mtype)...)
}

// reportTargetSamples reports the sample count for each target to the target allocator
func (t *transaction) reportTargetSamples(totalSamples int) {
// Report samples for each resource (job/instance combination)
// Use the original job name from the target (before metric relabeling) to ensure
// it matches the job name used by the target allocator
for rKey := range t.families {
jobName := t.originalJobName
if jobName == "" {
// Fallback to the job from resource key if original job name is not available
jobName = rKey.job
}
instance := t.originalAddress
if instance == "" {
// Fallback to the instance from resource key if original address is not available
instance = rKey.instance
}
t.targetSamplesReporter.ReportTargetSamples(jobName, instance, totalSamples)
}
}
1 change: 1 addition & 0 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger *slog.L
enableNativeHistogramsGate.IsEnabled(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
r.targetAllocatorManager,
)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions receiver/prometheusreceiver/targetallocator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Config struct {
CollectorID string `mapstructure:"collector_id"`
HTTPSDConfig *PromHTTPSDConfig `mapstructure:"http_sd_config"`
HTTPScrapeConfig *PromHTTPClientConfig `mapstructure:"http_scrape_config"`
ReportTargetSamples bool `mapstructure:"report_target_samples"`
// IgnoreHash can be used to disable the hash check and always update the scrape configuration.
IgnoreHash bool `mapstructure:"ignore_hash"`
}

// PromHTTPSDConfig is a redeclaration of promHTTP.SDConfig because we need custom unmarshaling
Expand Down
66 changes: 65 additions & 1 deletion receiver/prometheusreceiver/targetallocator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package targetallocator // import "github.com/open-telemetry/opentelemetry-colle
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
Expand All @@ -31,6 +32,15 @@ import (
"go.uber.org/zap"
)

// TargetSamplesRequest represents the data sent to target allocator's /target_samples endpoint
type TargetSamplesRequest struct {
Job string `json:"job"`
Instance string `json:"instance"`
CollectorID string `json:"collector_id"`
SampleCount int `json:"sample_count"`
Timestamp int64 `json:"timestamp"`
}

type Manager struct {
settings receiver.Settings
shutdown chan struct{}
Expand All @@ -40,6 +50,7 @@ type Manager struct {
scrapeManager *scrape.Manager
discoveryManager *discovery.Manager
enableNativeHistograms bool
httpClient *http.Client

// configUpdateCount tracks how many times the config has changed, for
// testing.
Expand Down Expand Up @@ -75,6 +86,7 @@ func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Man
m.settings.Logger.Error("Failed to create http client", zap.Error(err))
return err
}
m.httpClient = httpClient
m.settings.Logger.Info("Starting target allocator discovery")

operation := func() (uint64, error) {
Expand Down Expand Up @@ -132,7 +144,7 @@ func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, err
m.settings.Logger.Error("Failed to hash job list", zap.Error(err))
return 0, err
}
if hash == compareHash {
if !m.cfg.IgnoreHash && hash == compareHash {
// no update needed
return hash, nil
}
Expand All @@ -144,6 +156,8 @@ func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, err
m.promCfg.ScrapeConfigs = initialConfig

for jobName, scrapeConfig := range scrapeConfigsResponse {
m.settings.Logger.Debug("Syncing target allocator job", zap.String("job", jobName), zap.String("collector_id", m.cfg.CollectorID))

var httpSD promHTTP.SDConfig
if m.cfg.HTTPSDConfig == nil {
httpSD = promHTTP.SDConfig{
Expand Down Expand Up @@ -286,3 +300,53 @@ func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig)
yamlEncoder.Close()
return hash.Sum64(), err
}

// ReportTargetSamples sends target sample count to the target allocator
func (m *Manager) ReportTargetSamples(job, instance string, sampleCount int) {
if m.cfg == nil || !m.cfg.ReportTargetSamples || m.httpClient == nil {
return
}

m.settings.Logger.Info("Reporting target samples to target allocator",
zap.String("job", job),
zap.String("instance", instance),
zap.String("collector_id", m.cfg.CollectorID),
zap.Int("sample_count", sampleCount))

// Send the request asynchronously to avoid blocking the scrape process
go func() {
request := TargetSamplesRequest{
Job: job,
Instance: instance,
CollectorID: m.cfg.CollectorID,
SampleCount: sampleCount,
Timestamp: time.Now().Unix(),
}

jsonData, err := json.Marshal(request)
if err != nil {
m.settings.Logger.Warn("Failed to marshal target samples request", zap.Error(err))
return
}

targetSamplesURL := m.cfg.Endpoint + "/target_samples"
resp, err := m.httpClient.Post(targetSamplesURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
m.settings.Logger.Warn("Failed to send target samples to target allocator",
zap.Error(err),
zap.String("job", job),
zap.String("instance", instance),
zap.Int("sample_count", sampleCount))
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
m.settings.Logger.Warn("Target allocator returned non-success status for target samples",
zap.Int("status_code", resp.StatusCode),
zap.String("job", job),
zap.String("instance", instance),
zap.Int("sample_count", sampleCount))
}
}()
}
Loading