diff --git a/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml new file mode 100644 index 00000000000..bee580ca5b6 --- /dev/null +++ b/changelog/fragments/1757710842-Add-agent_policy_id-and-policy_revision_idx-to-checkin-requests.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add agent_policy_id and policy_revision_idx to checkin requests + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Add agent_policy_id and policy_revision_idx attributes to checkin requests. + These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running. + Agents that use these policies no longer need to send acks for POLICY_CHANGE actions. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6446 diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go index 634f2020e55..5d89511e758 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go @@ -473,7 +473,6 @@ type policyChange struct { cfg *config.Config action fleetapi.Action acker acker.Acker - commit bool ackWatcher chan struct{} } @@ -482,9 +481,9 @@ func newPolicyChange( config *config.Config, action fleetapi.Action, acker acker.Acker, - commit bool) *policyChange { + makeCh bool) *policyChange { var ackWatcher chan struct{} - if commit { + if makeCh { // we don't need it otherwise ackWatcher = make(chan struct{}) } @@ -493,7 +492,6 @@ func newPolicyChange( cfg: config, action: action, acker: acker, - commit: true, ackWatcher: ackWatcher, } } @@ -502,22 +500,21 @@ func (l *policyChange) Config() *config.Config { return l.cfg } +// Ack sends an ack for the associated action if the results are expected. +// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions. func (l *policyChange) Ack() error { - if l.action == nil { + if l.action == nil || l.ackWatcher == nil { return nil } err := l.acker.Ack(l.ctx, l.action) if err != nil { return err } - if l.commit { - err := l.acker.Commit(l.ctx) - if l.ackWatcher != nil && err == nil { - close(l.ackWatcher) - } - return err + err = l.acker.Commit(l.ctx) + if err == nil { + close(l.ackWatcher) } - return nil + return err } // WaitAck waits for policy change to be acked. @@ -525,7 +522,7 @@ func (l *policyChange) Ack() error { // Caller is responsible to use any reasonable deadline otherwise // function call can be endlessly blocking. func (l *policyChange) WaitAck(ctx context.Context) { - if !l.commit || l.ackWatcher == nil { + if l.ackWatcher == nil { return } diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go index 9fdf7184e89..74c3dddfdc0 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) { agentInfo := &info.AgentInfo{} nullStore := &storage.NullStore{} - t.Run("Config change should ACK", func(t *testing.T) { + t.Run("Config change shouldn't ACK", func(t *testing.T) { ch := make(chan coordinator.ConfigChange, 1) tacker := &testAcker{} @@ -129,8 +129,7 @@ func TestPolicyAcked(t *testing.T) { require.NoError(t, change.Ack()) actions := tacker.Items() - assert.EqualValues(t, 1, len(actions)) - assert.Equal(t, actionID, actions[0]) + assert.Empty(t, actions) }) } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 2fd60a79ae6..905ce4186db 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -71,6 +71,7 @@ type stateStore interface { AckToken() string SetAckToken(ackToken string) Save() error + Action() fleetapi.Action } type FleetGateway struct { @@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, // Fix loglevel with the current log level used by coordinator ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String() + action := f.stateStore.Action() + agentPolicyID := getPolicyID(action) + policyRevisionIDX := getPolicyRevisionIDX(action) + // checkin cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client) req := &fleetapi.CheckinRequest{ - AckToken: ackToken, - Metadata: ecsMeta, - Status: agentStateToString(state.State), - Message: state.Message, - Components: components, - UpgradeDetails: state.UpgradeDetails, + AckToken: ackToken, + Metadata: ecsMeta, + Status: agentStateToString(state.State), + Message: state.Message, + Components: components, + UpgradeDetails: state.UpgradeDetails, + AgentPolicyID: agentPolicyID, + PolicyRevisionIDX: policyRevisionIDX, } resp, took, err := cmd.Execute(ctx, req) @@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff { defaultFleetBackoffSettings.Max, ) } + +// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string. +func getPolicyID(action fleetapi.Action) string { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return "" + } + v, ok := policyChange.Data.Policy["policy_id"] + if !ok { + return "" + } + vv, ok := v.(string) + if !ok { + return "" + } + return vv +} + +// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64. +// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization. +func getPolicyRevisionIDX(action fleetapi.Action) int64 { + policyChange, ok := action.(*fleetapi.ActionPolicyChange) + if !ok { + return 0 + } + v, ok := policyChange.Data.Policy["policy_revision_idx"] + if !ok { + return 0 + } + switch vv := v.(type) { + case int64: + return vv + case int: + return int64(vv) + case float64: + return int64(vv) + default: + return 0 + } +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 217ac1ca457..dfcd40a93b7 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) { default: } }) + + t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + scheduler := scheduler.NewStepper() + client := newTestingClient() + + log, _ := loggertest.New("fleet_gateway") + + stateStore := newStateStore(t, log) + stateStore.SetAction(&fleetapi.ActionPolicyChange{ + ActionID: "test-action-id", + ActionType: fleetapi.ActionTypePolicyChange, + Data: fleetapi.ActionPolicyChangeData{ + Policy: map[string]interface{}{ + "policy_id": "test-policy-id", + "policy_revision_idx": 1, + }, + }, + }) + err := stateStore.Save() + require.NoError(t, err) + + gateway, err := newFleetGatewayWithScheduler( + log, + settings, + agentInfo, + client, + scheduler, + noop.New(), + emptyStateFetcher, + stateStore, + ) + require.NoError(t, err) + + waitFn := ackSeq( + client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { + data, err := io.ReadAll(body) + require.NoError(t, err) + + var checkinRequest fleetapi.CheckinRequest + err = json.Unmarshal(data, &checkinRequest) + require.NoError(t, err) + + require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID) + require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX) + + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + }), + ) + + errCh := runFleetGateway(ctx, gateway) + + // Synchronize scheduler and acking of calls from the worker go routine. + scheduler.Next() + waitFn() + + cancel() + err = <-errCh + require.NoError(t, err) + select { + case actions := <-gateway.Actions(): + t.Errorf("Expected no actions, got %v", actions) + default: + } + }) } func TestRetriesOnFailures(t *testing.T) { diff --git a/internal/pkg/fleetapi/checkin_cmd.go b/internal/pkg/fleetapi/checkin_cmd.go index 16ce9afe671..fb204b6ad3a 100644 --- a/internal/pkg/fleetapi/checkin_cmd.go +++ b/internal/pkg/fleetapi/checkin_cmd.go @@ -41,12 +41,14 @@ type CheckinComponent struct { // CheckinRequest consists of multiple events reported to fleet ui. type CheckinRequest struct { - Status string `json:"status"` - AckToken string `json:"ack_token,omitempty"` - Metadata *info.ECSMeta `json:"local_metadata,omitempty"` - Message string `json:"message"` // V2 Agent message - Components []CheckinComponent `json:"components"` // V2 Agent components - UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + Status string `json:"status"` + AckToken string `json:"ack_token,omitempty"` + Metadata *info.ECSMeta `json:"local_metadata,omitempty"` + Message string `json:"message"` // V2 Agent message + Components []CheckinComponent `json:"components"` // V2 Agent components + UpgradeDetails *details.Details `json:"upgrade_details,omitempty"` + AgentPolicyID string `json:"agent_policy_id,omitempty"` + PolicyRevisionIDX int64 `json:"policy_revision_idx,omitempty"` } // SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin