Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func New(
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create upgrader: %w", err)
}
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)
monitor := monitoring.New(isMonitoringSupported, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, rawConfig.OTel, agentInfo)

runtime, err := runtime.NewManager(
log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type MonitorManager interface {
// Enabled when configured to collect metrics/logs.
Enabled() bool

// Reload reloads the configuration for the upgrade manager.
// Reload reloads the configuration for the monitoring manager.
Reload(rawConfig *config.Config) error

// MonitoringConfig injects monitoring configuration into resolved ast tree.
Expand Down
129 changes: 128 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
"strings"
"time"
"unicode"

Check failure on line 22 in internal/pkg/agent/application/monitoring/v1_monitor.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

File is not properly formatted (goimports)
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"
"go.opentelemetry.io/collector/confmap"

koanfmaps "github.com/knadh/koanf/maps"

Expand Down Expand Up @@ -52,6 +53,7 @@
idKey = "id"
agentKey = "agent"
monitoringKey = "monitoring"
serviceKey = "service"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
Expand All @@ -60,6 +62,7 @@
agentName = "elastic-agent"
metricBeatName = "metricbeat"
fileBeatName = "filebeat"
collectorName = "collector"

monitoringMetricsUnitID = "metrics-monitoring"
monitoringFilesUnitsID = "filestream-monitoring"
Expand All @@ -86,6 +89,7 @@
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
otelConfig *confmap.Conf
operatingSystem string
agentInfo info.Agent
}
Expand All @@ -106,12 +110,13 @@
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, otelCfg *confmap.Conf, agentInfo info.Agent) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
otelConfig: otelCfg,
operatingSystem: operatingSystem,
agentInfo: agentInfo,
}
Expand All @@ -138,6 +143,7 @@
}

b.config = &newConfig
b.otelConfig = rawConfig.OTel
return nil
}

Expand Down Expand Up @@ -349,6 +355,16 @@
return nil
}

// Returns true if any component in the list uses the otel runtime.
func usingOtelRuntime(componentInfos []componentInfo) bool {
for _, ci := range componentInfos {
if ci.RuntimeManager == component.OtelRuntimeManager {
return true
}
}
return false
}

// Cleanup removes files that were created for monitoring.
func (b *BeatsMonitor) Cleanup(unit string) error {
if !b.Enabled() {
Expand Down Expand Up @@ -435,6 +451,16 @@
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
// If any other component uses the Otel runtime, also add a component to monitor
// its telemetry.
if b.config.C.MonitorMetrics && usingOtelRuntime(componentInfos) {
componentInfos = append(componentInfos,
componentInfo{
ID: fmt.Sprintf("prometheus/%s", monitoringMetricsUnitID),
BinaryName: metricBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
// sort the components to ensure a consistent order of inputs in the configuration
slices.SortFunc(componentInfos, func(a, b componentInfo) int {
return strings.Compare(a.ID, b.ID)
Expand Down Expand Up @@ -486,6 +512,47 @@
return defaultMonitoringNamespace
}

func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
type metricsReaderConfig struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the least verbose way I could find to extract the prometheus endpoint configuration (the mapstructure annotations don't seem to accept dotted field names), but I'd be happy to learn if there's a better one.

Pull struct {
Exporter struct {
Prometheus struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
} `mapstructure:"prometheus"`
} `mapstructure:"exporter"`
} `mapstructure:"pull"`
}
var otel struct {
Service struct {
Telemetry struct {
Metrics struct {
Readers []metricsReaderConfig `mapstructure:"readers"`
} `mapstructure:"metrics"`
} `mapstructure:"telemetry"`
} `mapstructure:"service"`
}
if err := b.otelConfig.Unmarshal(&otel, confmap.WithIgnoreUnused()); err == nil {
for _, reader := range otel.Service.Telemetry.Metrics.Readers {
p := reader.Pull.Exporter.Prometheus
if p.Host != "" || p.Port != 0 {
host := "localhost"
port := 8888
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to get this to use a named pipe/unix socket instead?

If we can't do that, the port will need to be configurable and to keep the --develop option working to allow two agents on the same host is has to be a random free port. https://github.com/pkoutsovasilis/elastic-agent/blob/c99794996e768f038b485939b1cd02aafd5db724/internal/pkg/agent/configuration/grpc.go#L41-L46

If we could just default this to a random free port initially it would be easiest, with the option to specify a fixed for cases where users need to work around local firewall rules.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to get support for unix sockets by using gRPC instead, see #9963 for an example which is enabling internal telemetry for the EDOT collector (elastic-agent otel mode)

if p.Host != "" {
host = p.Host
}
if p.Port != 0 {
port = p.Port
}
return host + ":" + strconv.Itoa(port)
}

}
}
// If there is no explicit configuration, the collector publishes its telemetry on port 8888.
return "localhost:8888"
}

// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.
func (b *BeatsMonitor) injectMetricsInput(
cfg map[string]interface{},
Expand Down Expand Up @@ -529,6 +596,20 @@
},
}

if usingOtelRuntime(componentInfos) {
prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString)
inputs = append(inputs, map[string]interface{}{
idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
"type": "prometheus/metrics",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Metricbeat prometheus module I think is faster, but this means we'll be using ECS instead of SemConv and the OTLP schema for data from the collector which is our eventual end goal.

Did you look at using the upstream prometheus receiver to do this instead? The data format will be different (OTLP) and you'll need to use processors to set the target data stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about this the more I think we only have two ways to do this:

  1. We use either of the prometheus/metrics module or the prometheus receiver, but we need to process the data so the field names are the same as what they are today without introducing any new data streams. This is collecting the data from a different place but keeping it compatible.
  2. We use only the prometheus receiver and ship the data in OTLP format to a new datastream. This is our end goal and is a breaking change to monitoring so can't go in a patch release.

Given the timeline for this I think we have to go with approach one here and try to get the data to look like it does now for the queue and outputs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to prototype option 1 ASAP, because if we can transform the internal telemetry pieces we need to keep the elastic_agent dashboard mostly working as is with the existing ECS data we can take the time we need to polish presenting the underlying OTLP data directly to users.

useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
"namespace": monitoringNamespace,
},
"streams": []any{prometheusStream},
})
}

// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
for _, input := range inputs {
Expand Down Expand Up @@ -730,6 +811,40 @@
return httpStreams
}

// getPrometheusStream returns the stream definition for prometheus/metrics input.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getPrometheusStream(
failureThreshold *uint,
metricsCollectionIntervalString string,
) any {
prometheusHost := b.getCollectorTelemetryEndpoint()

// want metricset "collector"
monitoringNamespace := b.monitoringNamespace()
indexName := fmt.Sprintf("metrics-elastic_agent.collector-%s", monitoringNamespace)
dataset := "elastic_agent.collector"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need #8970 (this issue specifies an OTLP formatted data stream as is which would require the prometheus receiver) for this to be writable by Fleet managed agents I think. Right now I would expect the agent API key from Fleet to not have permissions to create or index to this data stream.


otelStream := map[string]any{
idKey: fmt.Sprintf("%s-otel", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"collector"},
"metrics_path": "/metrics",
"hosts": []interface{}{prometheusHost},
"namespace": monitoringNamespace,
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForCollectorPrometheusStream(monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
otelStream[failureThresholdKey] = *failureThreshold
}
return otelStream
}

// getBeatsStreams returns stream definitions for beats inputs.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getBeatsStreams(
Expand Down Expand Up @@ -945,6 +1060,18 @@
}
}

// processorsForCollectorPrometheusStream returns the processors used for the OTel
// collector prometheus metrics
func processorsForCollectorPrometheusStream(namespace, dataset string, agentInfo info.Agent) []any {
return []interface{}{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(agentName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addComponentFieldsProcessor(agentName, agentName),
}
}

// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
return map[string]any{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func TestMonitorReload(t *testing.T) {
monitorcfg.MonitorLogs = false
monitorcfg.MonitorMetrics = false

beatsMonitor := New(true, "", monitorcfg, nil)
beatsMonitor := New(true, "", monitorcfg, nil, nil)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)
assert.Equal(t, beatsMonitor.config.C.MonitorLogs, false)

Expand Down
33 changes: 17 additions & 16 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/spf13/cobra"
"go.opentelemetry.io/collector/confmap"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -157,7 +158,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
return nil
}

cfg, lvl, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait, !isAdmin)
cfg, otel, lvl, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait, !isAdmin)
if err != nil {
return fmt.Errorf("error fetching config with variables: %w", err)
}
Expand All @@ -179,7 +180,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
return fmt.Errorf("failed to detect inputs and outputs: %w", err)
}

monitorFn, err := getMonitoringFn(ctx, cfg)
monitorFn, err := getMonitoringFn(ctx, cfg, otel)
if err != nil {
return fmt.Errorf("failed to get monitoring: %w", err)
}
Expand Down Expand Up @@ -369,12 +370,12 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
return nil, fmt.Errorf("error checking for root/Administrator privileges: %w", err)
}

m, lvl, err := getConfigWithVariables(ctx, l, cfgPath, variablesWait, !isAdmin)
m, otel, lvl, err := getConfigWithVariables(ctx, l, cfgPath, variablesWait, !isAdmin)
if err != nil {
return nil, err
}

monitorFn, err := getMonitoringFn(ctx, m)
monitorFn, err := getMonitoringFn(ctx, m, otel)
if err != nil {
return nil, fmt.Errorf("failed to get monitoring: %w", err)
}
Expand All @@ -393,7 +394,7 @@ func getComponentsFromPolicy(ctx context.Context, l *logger.Logger, cfgPath stri
return comps, nil
}

func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
func getMonitoringFn(ctx context.Context, cfg map[string]interface{}, otelCfg *confmap.Conf) (component.GenerateMonitoringCfgFn, error) {
config, err := config.NewConfigFrom(cfg)
if err != nil {
return nil, err
Expand All @@ -409,52 +410,52 @@ func getMonitoringFn(ctx context.Context, cfg map[string]interface{}) (component
return nil, fmt.Errorf("could not load agent info: %w", err)
}

monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, agentInfo)
monitor := monitoring.New(agentCfg.Settings.V1MonitoringEnabled, agentCfg.Settings.DownloadConfig.OS(), agentCfg.Settings.MonitoringConfig, otelCfg, agentInfo)
return monitor.MonitoringConfig, nil
}

func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration, unprivileged bool) (map[string]interface{}, logp.Level, error) {
func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration, unprivileged bool) (map[string]interface{}, *confmap.Conf, logp.Level, error) {

cfg, err := operations.LoadFullAgentConfig(ctx, l, cfgPath, true, unprivileged)
if err != nil {
return nil, logp.InfoLevel, err
return nil, nil, logp.InfoLevel, err
}
lvl, err := getLogLevel(cfg, cfgPath)
if err != nil {
return nil, logp.InfoLevel, err
return nil, nil, logp.InfoLevel, err
}
m, err := cfg.ToMapStr()
if err != nil {
return nil, lvl, err
return nil, nil, lvl, err
}
ast, err := transpiler.NewAST(m)
if err != nil {
return nil, lvl, fmt.Errorf("could not create the AST from the configuration: %w", err)
return nil, nil, lvl, fmt.Errorf("could not create the AST from the configuration: %w", err)
}

// Wait for the variables based on the timeout.
vars, err := vars.WaitForVariables(ctx, l, cfg, timeout)
if err != nil {
return nil, lvl, fmt.Errorf("failed to gather variables: %w", err)
return nil, nil, lvl, fmt.Errorf("failed to gather variables: %w", err)
}

// Render the inputs using the discovered inputs.
inputs, ok := transpiler.Lookup(ast, "inputs")
if ok {
renderedInputs, err := transpiler.RenderInputs(inputs, vars)
if err != nil {
return nil, lvl, fmt.Errorf("rendering inputs failed: %w", err)
return nil, nil, lvl, fmt.Errorf("rendering inputs failed: %w", err)
}
err = transpiler.Insert(ast, renderedInputs, "inputs")
if err != nil {
return nil, lvl, fmt.Errorf("inserting rendered inputs failed: %w", err)
return nil, nil, lvl, fmt.Errorf("inserting rendered inputs failed: %w", err)
}
}
m, err = ast.Map()
if err != nil {
return nil, lvl, fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err)
return nil, nil, lvl, fmt.Errorf("failed to convert ast to map[string]interface{}: %w", err)
}
return m, lvl, nil
return m, cfg.OTel, lvl, nil
}

func getLogLevel(rawCfg *config.Config, cfgPath string) (logp.Level, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/otel/translate/otelconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type exporterConfigTranslationFunc func(*config.C) (map[string]any, error)

var (
OtelSupportedOutputTypes = []string{"elasticsearch"}
OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics"}
OtelSupportedInputTypes = []string{"filestream", "http/metrics", "beat/metrics", "system/metrics", "prometheus/metrics"}
configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{
otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter,
}
Expand Down
Loading