Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add agent_policy_id and policy_revision_idx to checkin requests

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Add agent_policy_id and policy_revision_idx attributes to checkin requests.
These attributes are used to inform fleet-server of the policy id and revision that the agent is currently running.
Agents that use these policies no longer need to send acks for POLICY_CHANGE actions.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6446
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ type policyChange struct {
cfg *config.Config
action fleetapi.Action
acker acker.Acker
commit bool
ackWatcher chan struct{}
}

Expand All @@ -482,9 +481,9 @@ func newPolicyChange(
config *config.Config,
action fleetapi.Action,
acker acker.Acker,
commit bool) *policyChange {
makeCh bool) *policyChange {
var ackWatcher chan struct{}
if commit {
if makeCh {
// we don't need it otherwise
ackWatcher = make(chan struct{})
}
Expand All @@ -493,7 +492,6 @@ func newPolicyChange(
cfg: config,
action: action,
acker: acker,
commit: true,
ackWatcher: ackWatcher,
}
}
Expand All @@ -502,30 +500,29 @@ func (l *policyChange) Config() *config.Config {
return l.cfg
}

// Ack sends an ack for the associated action if the results are expected.
// An ack will not be sent for a POLICY_CHANGE action, but will be when this method is used by UNENROLL actions.
func (l *policyChange) Ack() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be less confusing to make this a nop, and provide something else for unenroll actions to use

if l.action == nil {
if l.action == nil || l.ackWatcher == nil {
return nil
}
err := l.acker.Ack(l.ctx, l.action)
if err != nil {
return err
}
if l.commit {
err := l.acker.Commit(l.ctx)
if l.ackWatcher != nil && err == nil {
close(l.ackWatcher)
}
return err
err = l.acker.Commit(l.ctx)
if err == nil {
close(l.ackWatcher)
}
return nil
return err
}

// WaitAck waits for policy change to be acked.
// Policy change ack is awaitable only in case commit flag was set.
// Caller is responsible to use any reasonable deadline otherwise
// function call can be endlessly blocking.
func (l *policyChange) WaitAck(ctx context.Context) {
if !l.commit || l.ackWatcher == nil {
if l.ackWatcher == nil {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestPolicyAcked(t *testing.T) {
agentInfo := &info.AgentInfo{}
nullStore := &storage.NullStore{}

t.Run("Config change should ACK", func(t *testing.T) {
t.Run("Config change shouldn't ACK", func(t *testing.T) {
ch := make(chan coordinator.ConfigChange, 1)
tacker := &testAcker{}

Expand All @@ -129,8 +129,7 @@ func TestPolicyAcked(t *testing.T) {
require.NoError(t, change.Ack())

actions := tacker.Items()
assert.EqualValues(t, 1, len(actions))
assert.Equal(t, actionID, actions[0])
assert.Empty(t, actions)
})
}

Expand Down
59 changes: 53 additions & 6 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type stateStore interface {
AckToken() string
SetAckToken(ackToken string)
Save() error
Action() fleetapi.Action
}

type FleetGateway struct {
Expand Down Expand Up @@ -356,15 +357,21 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
// Fix loglevel with the current log level used by coordinator
ecsMeta.Elastic.Agent.LogLevel = state.LogLevel.String()

action := f.stateStore.Action()
agentPolicyID := getPolicyID(action)
policyRevisionIDX := getPolicyRevisionIDX(action)

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AgentPolicyID: agentPolicyID,
PolicyRevisionIDX: policyRevisionIDX,
}

resp, took, err := cmd.Execute(ctx, req)
Expand Down Expand Up @@ -447,3 +454,43 @@ func RequestBackoff(done <-chan struct{}) backoff.Backoff {
defaultFleetBackoffSettings.Max,
)
}

// getPolicyID will check that the passed action is a POLICY_CHANGE action and return the policy_id attribute of the policy as a string.
func getPolicyID(action fleetapi.Action) string {
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
if !ok {
return ""
}
v, ok := policyChange.Data.Policy["policy_id"]
if !ok {
return ""
}
vv, ok := v.(string)
if !ok {
return ""
}
return vv
}

// getPolicyRevisionIDX will check that the passed action is a POLICY_CHANGE action and return the policy_revision_idx attribute of the policy as an int64.
// The function will attempt to convert the attribute to int64 if int or float64 is used in order to prevent issues from serialization.
func getPolicyRevisionIDX(action fleetapi.Action) int64 {
policyChange, ok := action.(*fleetapi.ActionPolicyChange)
if !ok {
return 0
}
v, ok := policyChange.Data.Policy["policy_revision_idx"]
if !ok {
return 0
}
switch vv := v.(type) {
case int64:
return vv
case int:
return int64(vv)
case float64:
return int64(vv)
default:
return 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,74 @@ func TestFleetGateway(t *testing.T) {
default:
}
})

t.Run("sends agent_policy_id and policy_revision_idx", func(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

scheduler := scheduler.NewStepper()
client := newTestingClient()

log, _ := loggertest.New("fleet_gateway")

stateStore := newStateStore(t, log)
stateStore.SetAction(&fleetapi.ActionPolicyChange{
ActionID: "test-action-id",
ActionType: fleetapi.ActionTypePolicyChange,
Data: fleetapi.ActionPolicyChangeData{
Policy: map[string]interface{}{
"policy_id": "test-policy-id",
"policy_revision_idx": 1,
},
},
})
err := stateStore.Save()
require.NoError(t, err)

gateway, err := newFleetGatewayWithScheduler(
log,
settings,
agentInfo,
client,
scheduler,
noop.New(),
emptyStateFetcher,
stateStore,
)
require.NoError(t, err)

waitFn := ackSeq(
client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
data, err := io.ReadAll(body)
require.NoError(t, err)

var checkinRequest fleetapi.CheckinRequest
err = json.Unmarshal(data, &checkinRequest)
require.NoError(t, err)

require.Equal(t, "test-policy-id", checkinRequest.AgentPolicyID)
require.Equal(t, int64(1), checkinRequest.PolicyRevisionIDX)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}),
)

errCh := runFleetGateway(ctx, gateway)

// Synchronize scheduler and acking of calls from the worker go routine.
scheduler.Next()
waitFn()

cancel()
err = <-errCh
require.NoError(t, err)
select {
case actions := <-gateway.Actions():
t.Errorf("Expected no actions, got %v", actions)
default:
}
})
}

func TestRetriesOnFailures(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type CheckinComponent struct {

// CheckinRequest consists of multiple events reported to fleet ui.
type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Message string `json:"message"` // V2 Agent message
Components []CheckinComponent `json:"components"` // V2 Agent components
UpgradeDetails *details.Details `json:"upgrade_details,omitempty"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Message string `json:"message"` // V2 Agent message
Components []CheckinComponent `json:"components"` // V2 Agent components
UpgradeDetails *details.Details `json:"upgrade_details,omitempty"`
AgentPolicyID string `json:"agent_policy_id,omitempty"`
PolicyRevisionIDX int64 `json:"policy_revision_idx,omitempty"`
}

// SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin
Expand Down
Loading