From 8f61046b327347a4295964143dcd271bba469d53 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Sep 2025 16:58:08 -0400 Subject: [PATCH 1/3] Add otel telemetry to agent monitoring data --- .../application/monitoring/v1_monitor.go | 76 +++++++++++++++++++ internal/pkg/otel/translate/otelconfig.go | 2 +- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 4aaf2df68bb..a70af1574c7 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -60,6 +60,7 @@ const ( agentName = "elastic-agent" metricBeatName = "metricbeat" fileBeatName = "filebeat" + collectorName = "collector" monitoringMetricsUnitID = "metrics-monitoring" monitoringFilesUnitsID = "filestream-monitoring" @@ -349,6 +350,16 @@ func (b *BeatsMonitor) Prepare(unit string) error { return nil } +// Returns true if any component in the list uses the otel runtime. +func usingOtelRuntime(componentInfos []componentInfo) bool { + for _, ci := range componentInfos { + if ci.RuntimeManager == component.OtelRuntimeManager { + return true + } + } + return false +} + // Cleanup removes files that were created for monitoring. func (b *BeatsMonitor) Cleanup(unit string) error { if !b.Enabled() { @@ -435,6 +446,16 @@ func (b *BeatsMonitor) getComponentInfos(components []component.Component, compo RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), }) } + // If any other component uses the Otel runtime, also add a component to monitor + // its telemetry. + if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) { + componentInfos = append(componentInfos, + componentInfo{ + ID: fmt.Sprintf("prometheus/%s", monitoringMetricsUnitID), + BinaryName: metricBeatName, + RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), + }) + } // sort the components to ensure a consistent order of inputs in the configuration slices.SortFunc(componentInfos, func(a, b componentInfo) int { return strings.Compare(a.ID, b.ID) @@ -529,6 +550,20 @@ func (b *BeatsMonitor) injectMetricsInput( }, } + if usingOtelRuntime(componentInfos) { + prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString) + inputs = append(inputs, map[string]interface{}{ + idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID), + "name": fmt.Sprintf("%s-collector", monitoringMetricsUnitID), + "type": "prometheus/metrics", + useOutputKey: monitoringOutput, + "data_stream": map[string]interface{}{ + "namespace": monitoringNamespace, + }, + "streams": []any{prometheusStream}, + }) + } + // Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { for _, input := range inputs { @@ -730,6 +765,38 @@ func (b *BeatsMonitor) getHttpStreams( return httpStreams } +// getPrometheusStream returns the stream definition for prometheus/metrics input. +// Note: The return type must be []any due to protobuf serialization quirks. +func (b *BeatsMonitor) getPrometheusStream( + failureThreshold *uint, + metricsCollectionIntervalString string, +) any { + // want metricset "collector" + monitoringNamespace := b.monitoringNamespace() + indexName := fmt.Sprintf("metrics-elastic_agent.collector-%s", monitoringNamespace) + dataset := "elastic_agent.collector" + + otelStream := map[string]any{ + idKey: fmt.Sprintf("%s-otel", monitoringMetricsUnitID), + "data_stream": map[string]interface{}{ + "type": "metrics", + "dataset": dataset, + "namespace": monitoringNamespace, + }, + "metricsets": []interface{}{"collector"}, + "metrics_path": "/metrics", + "hosts": []interface{}{"localhost:8888"}, + "namespace": monitoringNamespace, + "period": metricsCollectionIntervalString, + "index": indexName, + "processors": processorsForCollectorPrometheusStream(monitoringNamespace, dataset, b.agentInfo), + } + if failureThreshold != nil { + otelStream[failureThresholdKey] = *failureThreshold + } + return otelStream +} + // getBeatsStreams returns stream definitions for beats inputs. // Note: The return type must be []any due to protobuf serialization quirks. func (b *BeatsMonitor) getBeatsStreams( @@ -945,6 +1012,15 @@ func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agen } } +// processorsForCollectorPrometheusStream returns the processors used for the OTel +// collector prometheus metrics +func processorsForCollectorPrometheusStream(namespace, dataset string, agentInfo info.Agent) []any { + return []interface{}{ + addDataStreamFieldsProcessor(dataset, namespace), + addAgentFieldsProcessor(agentInfo.AgentID()), + } +} + // addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field. func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any { return map[string]any{ diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index d9ceec48bc0..3a05cec4ba4 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -43,7 +43,7 @@ type exporterConfigTranslationFunc func(*config.C) (map[string]any, error) var ( OtelSupportedOutputTypes = []string{"elasticsearch"} - OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics"} + OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics", "prometheus/metrics"} configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{ otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter, } From 3cb7ad00d579b22371796b8da765cfce7efef171 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Sep 2025 17:01:53 -0400 Subject: [PATCH 2/3] add remaining standard processors --- internal/pkg/agent/application/monitoring/v1_monitor.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index a70af1574c7..a0fb492e177 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -1017,7 +1017,10 @@ func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agen func processorsForCollectorPrometheusStream(namespace, dataset string, agentInfo info.Agent) []any { return []interface{}{ addDataStreamFieldsProcessor(dataset, namespace), + addEventFieldsProcessor(dataset), + addElasticAgentFieldsProcessor(agentName, agentInfo), addAgentFieldsProcessor(agentInfo.AgentID()), + addComponentFieldsProcessor(agentName, agentName), } } From a05982ec73ff1ba90132498a643bcf2783b0cb31 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 12 Sep 2025 16:27:05 -0400 Subject: [PATCH 3/3] Handle custom telemetry endpoint configurations --- internal/pkg/agent/application/application.go | 2 +- .../application/coordinator/coordinator.go | 2 +- .../application/monitoring/v1_monitor.go | 52 ++++++++++++++++++- .../application/monitoring/v1_monitor_test.go | 2 +- internal/pkg/agent/cmd/inspect.go | 33 ++++++------ 5 files changed, 70 insertions(+), 21 deletions(-) diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index dcaa2ddc570..6e337f9215f 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -124,7 +124,7 @@ func New( if err != nil { return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err) } - monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo) + monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo) runtime, err := runtime.NewManager( log, diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index a21d8e7cb9f..626aec6a293 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -102,7 +102,7 @@ type MonitorManager interface { // Enabled when configured to collect metrics/logs. Enabled() bool - // Reload reloads the configuration for the upgrade manager. + // Reload reloads the configuration for the monitoring manager. Reload(rawConfig *config.Config) error // MonitoringConfig injects monitoring configuration into resolved ast tree. diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index a0fb492e177..da016552c9b 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/utils" + "go.opentelemetry.io/collector/confmap" koanfmaps "github.com/knadh/koanf/maps" @@ -52,6 +53,7 @@ const ( idKey = "id" agentKey = "agent" monitoringKey = "monitoring" + serviceKey = "service" useOutputKey = "use_output" monitoringMetricsPeriodKey = "metrics_period" failureThresholdKey = "failure_threshold" @@ -87,6 +89,7 @@ var ( type BeatsMonitor struct { enabled bool // feature flag disabling whole v1 monitoring story config *monitoringConfig + otelConfig *confmap.Conf operatingSystem string agentInfo info.Agent } @@ -107,12 +110,13 @@ type monitoringConfig struct { } // New creates a new BeatsMonitor instance. -func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor { +func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent) *BeatsMonitor { return &BeatsMonitor{ enabled: enabled, config: &monitoringConfig{ C: cfg, }, + otelConfig: otelCfg, operatingSystem: operatingSystem, agentInfo: agentInfo, } @@ -139,6 +143,7 @@ func (b *BeatsMonitor) Reload(rawConfig *config.Config) error { } b.config = &newConfig + b.otelConfig = rawConfig.OTel return nil } @@ -507,6 +512,47 @@ func (b *BeatsMonitor) monitoringNamespace() string { return defaultMonitoringNamespace } +func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string { + type metricsReaderConfig struct { + Pull struct { + Exporter struct { + Prometheus struct { + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + } `mapstructure:"prometheus"` + } `mapstructure:"exporter"` + } `mapstructure:"pull"` + } + var otel struct { + Service struct { + Telemetry struct { + Metrics struct { + Readers []metricsReaderConfig `mapstructure:"readers"` + } `mapstructure:"metrics"` + } `mapstructure:"telemetry"` + } `mapstructure:"service"` + } + if err := b.otelConfig.Unmarshal(&otel, confmap.WithIgnoreUnused()); err == nil { + for _, reader := range otel.Service.Telemetry.Metrics.Readers { + p := reader.Pull.Exporter.Prometheus + if p.Host != "" || p.Port != 0 { + host := "localhost" + port := 8888 + if p.Host != "" { + host = p.Host + } + if p.Port != 0 { + port = p.Port + } + return host + ":" + strconv.Itoa(port) + } + + } + } + // If there is no explicit configuration, the collector publishes its telemetry on port 8888. + return "localhost:8888" +} + // injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object. func (b *BeatsMonitor) injectMetricsInput( cfg map[string]interface{}, @@ -771,6 +817,8 @@ func (b *BeatsMonitor) getPrometheusStream( failureThreshold *uint, metricsCollectionIntervalString string, ) any { + prometheusHost := b.getCollectorTelemetryEndpoint() + // want metricset "collector" monitoringNamespace := b.monitoringNamespace() indexName := fmt.Sprintf("metrics-elastic_agent.collector-%s", monitoringNamespace) @@ -785,7 +833,7 @@ func (b *BeatsMonitor) getPrometheusStream( }, "metricsets": []interface{}{"collector"}, "metrics_path": "/metrics", - "hosts": []interface{}{"localhost:8888"}, + "hosts": []interface{}{prometheusHost}, "namespace": monitoringNamespace, "period": metricsCollectionIntervalString, "index": indexName, diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 8ab48f0d9e3..677b9b57022 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -1066,7 +1066,7 @@ func TestMonitorReload(t *testing.T) { monitorcfg.MonitorLogs = false monitorcfg.MonitorMetrics = false - beatsMonitor := New(true, "", monitorcfg, nil) + beatsMonitor := New(true, "", monitorcfg, nil, nil) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index b71096626f1..b0ffcbf15b4 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -12,6 +12,7 @@ import ( "time" "github.com/spf13/cobra" + "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" "github.com/elastic/elastic-agent-libs/logp" @@ -157,7 +158,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return nil } - cfg, lvl, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait, !isAdmin) + cfg, otel, lvl, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait, !isAdmin) if err != nil { return fmt.Errorf("error fetching config with variables: %w", err) } @@ -179,7 +180,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts, return fmt.Errorf("failed to detect inputs and outputs: %w", err) } - monitorFn, err := getMonitoringFn(ctx, cfg) + monitorFn, err := getMonitoringFn(ctx, cfg, otel) if err != nil { return fmt.Errorf("failed to get monitoring: %w", err) } @@ -369,12 +370,12 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri return nil, fmt.Errorf("error checking for root/Administrator privileges: %w", err) } - m, lvl, err := getConfigWithVariables(ctx, l, cfgPath, variablesWait, !isAdmin) + m, otel, lvl, err := getConfigWithVariables(ctx, l, cfgPath, variablesWait, !isAdmin) if err != nil { return nil, err } - monitorFn, err := getMonitoringFn(ctx, m) + monitorFn, err := getMonitoringFn(ctx, m, otel) if err != nil { return nil, fmt.Errorf("failed to get monitoring: %w", err) } @@ -393,7 +394,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri return comps, nil } -func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) { +func getMonitoringFn(ctx context.Context, cfg map[string]interface{}, otelCfg *confmap.Conf) (component.GenerateMonitoringCfgFn, error) { config, err := config.NewConfigFrom(cfg) if err != nil { return nil, err @@ -409,33 +410,33 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component return nil, fmt.Errorf("could not load agent info: %w", err) } - monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo) + monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, otelCfg, agentInfo) return monitor.MonitoringConfig, nil } -func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration, unprivileged bool) (map[string]interface{}, logp.Level, error) { +func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration, unprivileged bool) (map[string]interface{}, *confmap.Conf, logp.Level, error) { cfg, err := operations.LoadFullAgentConfig(ctx, l, cfgPath, true, unprivileged) if err != nil { - return nil, logp.InfoLevel, err + return nil, nil, logp.InfoLevel, err } lvl, err := getLogLevel(cfg, cfgPath) if err != nil { - return nil, logp.InfoLevel, err + return nil, nil, logp.InfoLevel, err } m, err := cfg.ToMapStr() if err != nil { - return nil, lvl, err + return nil, nil, lvl, err } ast, err := transpiler.NewAST(m) if err != nil { - return nil, lvl, fmt.Errorf("could not create the AST from the configuration: %w", err) + return nil, nil, lvl, fmt.Errorf("could not create the AST from the configuration: %w", err) } // Wait for the variables based on the timeout. vars, err := vars.WaitForVariables(ctx, l, cfg, timeout) if err != nil { - return nil, lvl, fmt.Errorf("failed to gather variables: %w", err) + return nil, nil, lvl, fmt.Errorf("failed to gather variables: %w", err) } // Render the inputs using the discovered inputs. @@ -443,18 +444,18 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin if ok { renderedInputs, err := transpiler.RenderInputs(inputs, vars) if err != nil { - return nil, lvl, fmt.Errorf("rendering inputs failed: %w", err) + return nil, nil, lvl, fmt.Errorf("rendering inputs failed: %w", err) } err = transpiler.Insert(ast, renderedInputs, "inputs") if err != nil { - return nil, lvl, fmt.Errorf("inserting rendered inputs failed: %w", err) + return nil, nil, lvl, fmt.Errorf("inserting rendered inputs failed: %w", err) } } m, err = ast.Map() if err != nil { - return nil, lvl, fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) + return nil, nil, lvl, fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err) } - return m, lvl, nil + return m, cfg.OTel, lvl, nil } func getLogLevel(rawCfg *config.Config, cfgPath string) (logp.Level, error) {