-
Notifications
You must be signed in to change notification settings - Fork 188
Ingest internal telemetry from the OTel Collector when it is running #9928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,10 @@ | |
"strings" | ||
"time" | ||
"unicode" | ||
|
||
"github.com/elastic/elastic-agent/pkg/component" | ||
"github.com/elastic/elastic-agent/pkg/utils" | ||
"go.opentelemetry.io/collector/confmap" | ||
|
||
koanfmaps "github.com/knadh/koanf/maps" | ||
|
||
|
@@ -52,6 +53,7 @@ | |
idKey = "id" | ||
agentKey = "agent" | ||
monitoringKey = "monitoring" | ||
serviceKey = "service" | ||
useOutputKey = "use_output" | ||
monitoringMetricsPeriodKey = "metrics_period" | ||
failureThresholdKey = "failure_threshold" | ||
|
@@ -60,6 +62,7 @@ | |
agentName = "elastic-agent" | ||
metricBeatName = "metricbeat" | ||
fileBeatName = "filebeat" | ||
collectorName = "collector" | ||
|
||
monitoringMetricsUnitID = "metrics-monitoring" | ||
monitoringFilesUnitsID = "filestream-monitoring" | ||
|
@@ -86,6 +89,7 @@ | |
type BeatsMonitor struct { | ||
enabled bool // feature flag disabling whole v1 monitoring story | ||
config *monitoringConfig | ||
otelConfig *confmap.Conf | ||
operatingSystem string | ||
agentInfo info.Agent | ||
} | ||
|
@@ -106,12 +110,13 @@ | |
} | ||
|
||
// New creates a new BeatsMonitor instance. | ||
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor { | ||
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent) *BeatsMonitor { | ||
return &BeatsMonitor{ | ||
enabled: enabled, | ||
config: &monitoringConfig{ | ||
C: cfg, | ||
}, | ||
otelConfig: otelCfg, | ||
operatingSystem: operatingSystem, | ||
agentInfo: agentInfo, | ||
} | ||
|
@@ -138,6 +143,7 @@ | |
} | ||
|
||
b.config = &newConfig | ||
b.otelConfig = rawConfig.OTel | ||
return nil | ||
} | ||
|
||
|
@@ -349,6 +355,16 @@ | |
return nil | ||
} | ||
|
||
// Returns true if any component in the list uses the otel runtime. | ||
func usingOtelRuntime(componentInfos []componentInfo) bool { | ||
for _, ci := range componentInfos { | ||
if ci.RuntimeManager == component.OtelRuntimeManager { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// Cleanup removes files that were created for monitoring. | ||
func (b *BeatsMonitor) Cleanup(unit string) error { | ||
if !b.Enabled() { | ||
|
@@ -435,6 +451,16 @@ | |
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), | ||
}) | ||
} | ||
// If any other component uses the Otel runtime, also add a component to monitor | ||
// its telemetry. | ||
if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) { | ||
componentInfos = append(componentInfos, | ||
componentInfo{ | ||
ID: fmt.Sprintf("prometheus/%s", monitoringMetricsUnitID), | ||
BinaryName: metricBeatName, | ||
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager), | ||
}) | ||
} | ||
// sort the components to ensure a consistent order of inputs in the configuration | ||
slices.SortFunc(componentInfos, func(a, b componentInfo) int { | ||
return strings.Compare(a.ID, b.ID) | ||
|
@@ -486,6 +512,47 @@ | |
return defaultMonitoringNamespace | ||
} | ||
|
||
func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string { | ||
type metricsReaderConfig struct { | ||
Pull struct { | ||
Exporter struct { | ||
Prometheus struct { | ||
Host string `mapstructure:"host"` | ||
Port int `mapstructure:"port"` | ||
} `mapstructure:"prometheus"` | ||
} `mapstructure:"exporter"` | ||
} `mapstructure:"pull"` | ||
} | ||
var otel struct { | ||
Service struct { | ||
Telemetry struct { | ||
Metrics struct { | ||
Readers []metricsReaderConfig `mapstructure:"readers"` | ||
} `mapstructure:"metrics"` | ||
} `mapstructure:"telemetry"` | ||
} `mapstructure:"service"` | ||
} | ||
if err := b.otelConfig.Unmarshal(&otel, confmap.WithIgnoreUnused()); err == nil { | ||
for _, reader := range otel.Service.Telemetry.Metrics.Readers { | ||
p := reader.Pull.Exporter.Prometheus | ||
if p.Host != "" || p.Port != 0 { | ||
host := "localhost" | ||
port := 8888 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to get this to use a named pipe/unix socket instead? If we can't do that, the port will need to be configurable and to keep the If we could just default this to a random free port initially it would be easiest, with the option to specify a fixed for cases where users need to work around local firewall rules. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be able to get support for unix sockets by using gRPC instead, see #9963 for an example which is enabling internal telemetry for the EDOT collector ( |
||
if p.Host != "" { | ||
host = p.Host | ||
} | ||
if p.Port != 0 { | ||
port = p.Port | ||
} | ||
return host + ":" + strconv.Itoa(port) | ||
} | ||
|
||
} | ||
} | ||
// If there is no explicit configuration, the collector publishes its telemetry on port 8888. | ||
return "localhost:8888" | ||
} | ||
|
||
// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object. | ||
func (b *BeatsMonitor) injectMetricsInput( | ||
cfg map[string]interface{}, | ||
|
@@ -529,6 +596,20 @@ | |
}, | ||
} | ||
|
||
if usingOtelRuntime(componentInfos) { | ||
prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString) | ||
inputs = append(inputs, map[string]interface{}{ | ||
idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID), | ||
"name": fmt.Sprintf("%s-collector", monitoringMetricsUnitID), | ||
"type": "prometheus/metrics", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the Metricbeat prometheus module I think is faster, but this means we'll be using ECS instead of SemConv and the OTLP schema for data from the collector which is our eventual end goal. Did you look at using the upstream prometheus receiver to do this instead? The data format will be different (OTLP) and you'll need to use processors to set the target data stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The more I think about this the more I think we only have two ways to do this:
Given the timeline for this I think we have to go with approach one here and try to get the data to look like it does now for the queue and outputs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to prototype option 1 ASAP, because if we can transform the internal telemetry pieces we need to keep the elastic_agent dashboard mostly working as is with the existing ECS data we can take the time we need to polish presenting the underlying OTLP data directly to users. |
||
useOutputKey: monitoringOutput, | ||
"data_stream": map[string]interface{}{ | ||
"namespace": monitoringNamespace, | ||
}, | ||
"streams": []any{prometheusStream}, | ||
}) | ||
} | ||
|
||
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled | ||
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager { | ||
for _, input := range inputs { | ||
|
@@ -730,6 +811,40 @@ | |
return httpStreams | ||
} | ||
|
||
// getPrometheusStream returns the stream definition for prometheus/metrics input. | ||
// Note: The return type must be []any due to protobuf serialization quirks. | ||
func (b *BeatsMonitor) getPrometheusStream( | ||
failureThreshold *uint, | ||
metricsCollectionIntervalString string, | ||
) any { | ||
prometheusHost := b.getCollectorTelemetryEndpoint() | ||
|
||
// want metricset "collector" | ||
monitoringNamespace := b.monitoringNamespace() | ||
indexName := fmt.Sprintf("metrics-elastic_agent.collector-%s", monitoringNamespace) | ||
dataset := "elastic_agent.collector" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need #8970 (this issue specifies an OTLP formatted data stream as is which would require the prometheus receiver) for this to be writable by Fleet managed agents I think. Right now I would expect the agent API key from Fleet to not have permissions to create or index to this data stream. |
||
|
||
otelStream := map[string]any{ | ||
idKey: fmt.Sprintf("%s-otel", monitoringMetricsUnitID), | ||
"data_stream": map[string]interface{}{ | ||
"type": "metrics", | ||
"dataset": dataset, | ||
"namespace": monitoringNamespace, | ||
}, | ||
"metricsets": []interface{}{"collector"}, | ||
"metrics_path": "/metrics", | ||
"hosts": []interface{}{prometheusHost}, | ||
"namespace": monitoringNamespace, | ||
"period": metricsCollectionIntervalString, | ||
"index": indexName, | ||
"processors": processorsForCollectorPrometheusStream(monitoringNamespace, dataset, b.agentInfo), | ||
} | ||
if failureThreshold != nil { | ||
otelStream[failureThresholdKey] = *failureThreshold | ||
} | ||
return otelStream | ||
} | ||
|
||
// getBeatsStreams returns stream definitions for beats inputs. | ||
// Note: The return type must be []any due to protobuf serialization quirks. | ||
func (b *BeatsMonitor) getBeatsStreams( | ||
|
@@ -945,6 +1060,18 @@ | |
} | ||
} | ||
|
||
// processorsForCollectorPrometheusStream returns the processors used for the OTel | ||
// collector prometheus metrics | ||
func processorsForCollectorPrometheusStream(namespace, dataset string, agentInfo info.Agent) []any { | ||
return []interface{}{ | ||
addDataStreamFieldsProcessor(dataset, namespace), | ||
addEventFieldsProcessor(dataset), | ||
addElasticAgentFieldsProcessor(agentName, agentInfo), | ||
addAgentFieldsProcessor(agentInfo.AgentID()), | ||
addComponentFieldsProcessor(agentName, agentName), | ||
} | ||
} | ||
|
||
// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field. | ||
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any { | ||
return map[string]any{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the least verbose way I could find to extract the prometheus endpoint configuration (the mapstructure annotations don't seem to accept dotted field names), but I'd be happy to learn if there's a better one.