From 0b6dcea1a5c10cd7dc48849519c8041454eca292 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 12 Sep 2025 11:02:43 -0700 Subject: [PATCH] Fix stats engine race condition This commit addresses multiple issues in the DockerStatsEngine: Race Condition Fixes: - Add context cancellation checks in taskContainerMetricsUnsafe to prevent metrics collection on containers that are in the middle of being cleaned up. - Implement removeContainerFromAllTasksUnsafe method to handle orphaned container cleanup. This prevents container leaks when the Docker Task Engine has already cleaned up the container but the stats engine still has it tracked. This fixes the condition where the ECS agent starts logging messages like this continuously for a particular container: ``` ecs_agent_logs/ecs-agent.log:1496:level=error time=2025-09-12T11:18:54Z msg="Error collecting cloudwatch metrics for container" container="111222333444555" error="need at least 1 non-NaN data points in queue to calculate CW stats set" ecs_agent_logs/ecs-agent.log:1500:level=error time=2025-09-12T11:19:14Z msg="Error collecting cloudwatch metrics for container" container="111222333444555" error="need at least 1 non-NaN data points in queue to calculate CW stats set" ecs_agent_logs/ecs-agent.log:1507:level=error time=2025-09-12T11:19:34Z msg="Error collecting cloudwatch metrics for container" container="111222333444555" error="need at least 1 non-NaN data points in queue to calculate CW stats set" ``` Unrelated but also rename engine_linux.go to engine_unix.go so that this package can compile on macOS more easily. --- agent/stats/engine.go | 109 ++++++- agent/stats/engine_test.go | 294 ++++++++++++++++++ .../stats/{engine_linux.go => engine_unix.go} | 4 +- 3 files changed, 392 insertions(+), 15 deletions(-) rename agent/stats/{engine_linux.go => engine_unix.go} (97%) diff --git a/agent/stats/engine.go b/agent/stats/engine.go index e09d3120ca8..47a7f177b9a 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -770,10 +770,17 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) { engine.lock.Lock() defer engine.lock.Unlock() - // Make sure that this container belongs to a task. + // Try to resolve the container to get the task ARN for proper cleanup task, err := engine.resolver.ResolveTask(dockerID) if err != nil { - seelog.Debugf("Could not map container to task, ignoring, err: %v, id: %s", err, dockerID) + // If we can't resolve the container to a task (e.g., because the Docker Task Engine + // has already cleaned it up), we need to search through all tasks to find and remove + // this container to prevent it from being stuck in the stats engine forever. + logger.Debug("Could not map container to task, searching all tasks for cleanup", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + engine.removeContainerFromAllTasksUnsafe(dockerID) return } @@ -794,6 +801,48 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) { engine.doRemoveContainerUnsafe(container, task.Arn) } +// removeContainerFromAllTasksUnsafe searches through all tasks to find and remove a container +// when we can't resolve the container to a specific task. This prevents container leaks +// when the Docker Task Engine has already cleaned up the container but the stats engine +// still has it tracked. +func (engine *DockerStatsEngine) removeContainerFromAllTasksUnsafe(dockerID string) { + var foundContainer *StatsContainer + var foundTaskArn string + + // Search through all tasks to find the container + for taskArn, containerMap := range engine.tasksToContainers { + if container, exists := containerMap[dockerID]; exists { + foundContainer = container + foundTaskArn = taskArn + break + } + } + + if foundContainer != nil { + logger.Info("Found orphaned container in stats engine, removing", logger.Fields{ + field.Container: dockerID, + field.TaskARN: foundTaskArn, + }) + engine.doRemoveContainerUnsafe(foundContainer, foundTaskArn) + } else { + // Also check health check containers + for taskArn, containerMap := range engine.tasksToHealthCheckContainers { + if container, exists := containerMap[dockerID]; exists { + logger.Info("Found orphaned health check container in stats engine, removing", logger.Fields{ + field.Container: dockerID, + field.TaskARN: taskArn, + }) + delete(engine.tasksToHealthCheckContainers[taskArn], dockerID) + if len(engine.tasksToHealthCheckContainers[taskArn]) == 0 { + delete(engine.tasksToHealthCheckContainers, taskArn) + } + container.StopStatsCollection() + break + } + } + } +} + // newDockerContainerMetadataResolver returns a new instance of DockerContainerMetadataResolver. func newDockerContainerMetadataResolver(taskEngine ecsengine.TaskEngine) (*DockerContainerMetadataResolver, error) { dockerTaskEngine, ok := taskEngine.(*ecsengine.DockerTaskEngine) @@ -821,13 +870,29 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* var containerMetrics []*ecstcs.ContainerMetric for _, container := range containerMap { dockerID := container.containerMetadata.DockerID - // Check if the container is terminal. If it is, make sure that it is + // Validate container exists in current task map + if _, exists := containerMap[dockerID]; !exists { + continue + } // cleaned up properly. We might sometimes miss events from docker task // engine and this helps in reconciling the state. The tcs client's // GetInstanceMetrics probe is used as the trigger for this. if engine.stopTrackingContainerUnsafe(container, taskArn) { continue } + + // Check if container is being cleaned up by verifying if context is cancelled + select { + case <-container.ctx.Done(): + // Container is being cleaned up, skip metrics collection + logger.Debug("Container context cancelled, skipping metrics collection", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + // Container context is still active, proceed with metrics collection + } + // age is used to determine if we should or should not expect missing metrics. // this is because recently-started containers would normally not have their metrics // queue filled yet. @@ -842,22 +907,40 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* if age < gracePeriod { continue } - logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ - field.Container: dockerID, - field.Error: err, - }) - continue + // Check again if container is being cleaned up before logging error + select { + case <-container.ctx.Done(): + logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + continue + } } memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet() if err != nil { if age < gracePeriod { continue } - logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ - field.Container: dockerID, - field.Error: err, - }) - continue + // Check again if container is being cleaned up before logging error + select { + case <-container.ctx.Done(): + logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + continue + } } containerMetric := &ecstcs.ContainerMetric{ diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 96f1a687d95..0f0f0e5094d 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -910,3 +910,297 @@ func TestFetchEBSVolumeMetrics(t *testing.T) { } } + +// TestStatsEngineRaceConditionFix tests that containers with cancelled contexts +// don't have their metrics collected, preventing race condition errors. +func TestStatsEngineRaceConditionFix(t *testing.T) { + ctrl := gomock.NewController(t) + defer func() { + if ctrl != nil { + ctrl.Finish() + } + }() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineRaceConditionFix"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add container and populate with stats + engine.addAndStartStatsContainer("c1") + containers, exists := engine.tasksToContainers["t1"] + require.True(t, exists, "Task t1 should exist in tasksToContainers") + require.Len(t, containers, 1) + + statsContainer := containers["c1"] + require.NotNil(t, statsContainer) + + // Add some stats to the container + for _, fakeContainerStats := range createFakeContainerStats() { + statsContainer.statsQueue.add(fakeContainerStats) + } + + // Verify metrics can be collected normally + containerMetrics, err := engine.taskContainerMetricsUnsafe("t1") + require.NoError(t, err) + require.Len(t, containerMetrics, 1) + + // Cancel the container's context to simulate cleanup + statsContainer.StopStatsCollection() + + // Wait a bit for context cancellation to propagate + time.Sleep(100 * time.Millisecond) + + // Now try to collect metrics - should skip the cancelled container + // The implementation skips cancelled containers silently, so we should get + // empty metrics but no error since the container is still tracked + containerMetrics, err = engine.taskContainerMetricsUnsafe("t1") + // The actual implementation skips cancelled containers and returns empty metrics + require.NoError(t, err) + require.Empty(t, containerMetrics) +} + +// TestStatsEngineContainerLeakFix tests that containers that can't be resolved +// to tasks are still properly removed from all tracking maps. +func TestStatsEngineContainerLeakFix(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + t2 := &apitask.Task{Arn: "t2", Family: "f2", NetworkMode: "bridge"} + + // Set up expectations for adding containers - allow multiple calls since stats collection may call ResolveTask + resolver.EXPECT().ResolveTask("c1").Return(t1, nil).AnyTimes() + resolver.EXPECT().ResolveTask("c2").Return(t2, nil).AnyTimes() + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineContainerLeakFix"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add containers to different tasks + engine.addAndStartStatsContainer("c1") + engine.addAndStartStatsContainer("c2") + + // Verify both containers are tracked + require.Len(t, engine.tasksToContainers, 2) + containers1, exists1 := engine.tasksToContainers["t1"] + require.True(t, exists1) + require.Len(t, containers1, 1) + _, exists := containers1["c1"] + require.True(t, exists) + + containers2, exists2 := engine.tasksToContainers["t2"] + require.True(t, exists2) + require.Len(t, containers2, 1) + _, exists = containers2["c2"] + require.True(t, exists) + + // Remove c1 - should work normally since it can be resolved + engine.removeContainer("c1") + containers1, _ = engine.tasksToContainers["t1"] + _, exists = containers1["c1"] + require.False(t, exists, "Container c1 should be removed") + + // Remove c2 - this simulates the container leak scenario where + // the container can't be resolved to a task (Docker Task Engine cleaned it up) + // but it should still be removed from all tracking maps + engine.removeContainer("c2") + + // Verify c2 is removed from all maps even though resolution failed + containers2, exists2 = engine.tasksToContainers["t2"] + if exists2 { + _, exists = containers2["c2"] + require.False(t, exists, "Container c2 should be removed even when resolution fails") + } + + // Verify the container is not tracked anywhere + found := false + for taskArn, containers := range engine.tasksToContainers { + if _, exists := containers["c2"]; exists { + found = true + t.Errorf("Container c2 still found in task %s after removal", taskArn) + } + } + require.False(t, found, "Container c2 should not be found in any task") +} + +// TestStatsEngineOrphanedContainerCleanup tests the removeContainerFromAllTasksUnsafe +// method that handles containers that can't be resolved to their original task. +func TestStatsEngineOrphanedContainerCleanup(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + t2 := &apitask.Task{Arn: "t2", Family: "f2", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveTask("orphaned").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineOrphanedContainerCleanup"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add containers to different tasks + engine.addAndStartStatsContainer("c1") + engine.addAndStartStatsContainer("c2") + engine.addAndStartStatsContainer("orphaned") + + // Verify all containers are tracked + require.Len(t, engine.tasksToContainers, 2) + containers1, _ := engine.tasksToContainers["t1"] + require.Len(t, containers1, 2) // c1 and orphaned + containers2, _ := engine.tasksToContainers["t2"] + require.Len(t, containers2, 1) // c2 + + // Test the removeContainerFromAllTasksUnsafe method directly + engine.lock.Lock() + engine.removeContainerFromAllTasksUnsafe("orphaned") + engine.lock.Unlock() + + // Verify orphaned container is removed from t1 + containers1, _ = engine.tasksToContainers["t1"] + _, exists := containers1["orphaned"] + require.False(t, exists, "Orphaned container should be removed from t1") + + // Verify other containers are still present + _, exists = containers1["c1"] + require.True(t, exists, "Container c1 should still be present") + containers2, _ = engine.tasksToContainers["t2"] + _, exists = containers2["c2"] + require.True(t, exists, "Container c2 should still be present") +} + +// TestStatsEngineHealthCheckContainerCleanup tests that health check containers +// are also properly cleaned up when they become orphaned. +func TestStatsEngineHealthCheckContainerCleanup(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("health_container").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveContainer("health_container").AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "healthContainer", + NetworkModeUnsafe: "bridge", + HealthCheckType: "docker", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineHealthCheckContainerCleanup"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add health check container + engine.addAndStartStatsContainer("health_container") + + // Verify container is tracked in both regular and health check maps + require.Len(t, engine.tasksToContainers, 1) + require.Len(t, engine.tasksToHealthCheckContainers, 1) + + containers, _ := engine.tasksToContainers["t1"] + _, exists := containers["health_container"] + require.True(t, exists, "Health container should be in regular containers map") + + healthContainers, _ := engine.tasksToHealthCheckContainers["t1"] + _, exists = healthContainers["health_container"] + require.True(t, exists, "Health container should be in health check containers map") + + // Test cleanup of orphaned health check container + engine.lock.Lock() + engine.removeContainerFromAllTasksUnsafe("health_container") + engine.lock.Unlock() + + // Verify container is removed from both maps + containers, _ = engine.tasksToContainers["t1"] + _, exists = containers["health_container"] + require.False(t, exists, "Health container should be removed from regular containers map") + + healthContainers, _ = engine.tasksToHealthCheckContainers["t1"] + _, exists = healthContainers["health_container"] + require.False(t, exists, "Health container should be removed from health check containers map") +} diff --git a/agent/stats/engine_linux.go b/agent/stats/engine_unix.go similarity index 97% rename from agent/stats/engine_linux.go rename to agent/stats/engine_unix.go index 098d661a4be..4010b917373 100644 --- a/agent/stats/engine_linux.go +++ b/agent/stats/engine_unix.go @@ -1,5 +1,5 @@ -//go:build linux -// +build linux +//go:build !windows +// +build !windows // Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. //