diff --git a/agent/stats/engine.go b/agent/stats/engine.go index e09d3120ca8..47a7f177b9a 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -770,10 +770,17 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) { engine.lock.Lock() defer engine.lock.Unlock() - // Make sure that this container belongs to a task. + // Try to resolve the container to get the task ARN for proper cleanup task, err := engine.resolver.ResolveTask(dockerID) if err != nil { - seelog.Debugf("Could not map container to task, ignoring, err: %v, id: %s", err, dockerID) + // If we can't resolve the container to a task (e.g., because the Docker Task Engine + // has already cleaned it up), we need to search through all tasks to find and remove + // this container to prevent it from being stuck in the stats engine forever. + logger.Debug("Could not map container to task, searching all tasks for cleanup", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + engine.removeContainerFromAllTasksUnsafe(dockerID) return } @@ -794,6 +801,48 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) { engine.doRemoveContainerUnsafe(container, task.Arn) } +// removeContainerFromAllTasksUnsafe searches through all tasks to find and remove a container +// when we can't resolve the container to a specific task. This prevents container leaks +// when the Docker Task Engine has already cleaned up the container but the stats engine +// still has it tracked. +func (engine *DockerStatsEngine) removeContainerFromAllTasksUnsafe(dockerID string) { + var foundContainer *StatsContainer + var foundTaskArn string + + // Search through all tasks to find the container + for taskArn, containerMap := range engine.tasksToContainers { + if container, exists := containerMap[dockerID]; exists { + foundContainer = container + foundTaskArn = taskArn + break + } + } + + if foundContainer != nil { + logger.Info("Found orphaned container in stats engine, removing", logger.Fields{ + field.Container: dockerID, + field.TaskARN: foundTaskArn, + }) + engine.doRemoveContainerUnsafe(foundContainer, foundTaskArn) + } else { + // Also check health check containers + for taskArn, containerMap := range engine.tasksToHealthCheckContainers { + if container, exists := containerMap[dockerID]; exists { + logger.Info("Found orphaned health check container in stats engine, removing", logger.Fields{ + field.Container: dockerID, + field.TaskARN: taskArn, + }) + delete(engine.tasksToHealthCheckContainers[taskArn], dockerID) + if len(engine.tasksToHealthCheckContainers[taskArn]) == 0 { + delete(engine.tasksToHealthCheckContainers, taskArn) + } + container.StopStatsCollection() + break + } + } + } +} + // newDockerContainerMetadataResolver returns a new instance of DockerContainerMetadataResolver. func newDockerContainerMetadataResolver(taskEngine ecsengine.TaskEngine) (*DockerContainerMetadataResolver, error) { dockerTaskEngine, ok := taskEngine.(*ecsengine.DockerTaskEngine) @@ -821,13 +870,29 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* var containerMetrics []*ecstcs.ContainerMetric for _, container := range containerMap { dockerID := container.containerMetadata.DockerID - // Check if the container is terminal. If it is, make sure that it is + // Validate container exists in current task map + if _, exists := containerMap[dockerID]; !exists { + continue + } // cleaned up properly. We might sometimes miss events from docker task // engine and this helps in reconciling the state. The tcs client's // GetInstanceMetrics probe is used as the trigger for this. if engine.stopTrackingContainerUnsafe(container, taskArn) { continue } + + // Check if container is being cleaned up by verifying if context is cancelled + select { + case <-container.ctx.Done(): + // Container is being cleaned up, skip metrics collection + logger.Debug("Container context cancelled, skipping metrics collection", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + // Container context is still active, proceed with metrics collection + } + // age is used to determine if we should or should not expect missing metrics. // this is because recently-started containers would normally not have their metrics // queue filled yet. @@ -842,22 +907,40 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]* if age < gracePeriod { continue } - logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ - field.Container: dockerID, - field.Error: err, - }) - continue + // Check again if container is being cleaned up before logging error + select { + case <-container.ctx.Done(): + logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + continue + } } memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet() if err != nil { if age < gracePeriod { continue } - logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ - field.Container: dockerID, - field.Error: err, - }) - continue + // Check again if container is being cleaned up before logging error + select { + case <-container.ctx.Done(): + logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{ + field.Container: dockerID, + }) + continue + default: + logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{ + field.Container: dockerID, + field.Error: err, + }) + continue + } } containerMetric := &ecstcs.ContainerMetric{ diff --git a/agent/stats/engine_test.go b/agent/stats/engine_test.go index 96f1a687d95..0f0f0e5094d 100644 --- a/agent/stats/engine_test.go +++ b/agent/stats/engine_test.go @@ -910,3 +910,297 @@ func TestFetchEBSVolumeMetrics(t *testing.T) { } } + +// TestStatsEngineRaceConditionFix tests that containers with cancelled contexts +// don't have their metrics collected, preventing race condition errors. +func TestStatsEngineRaceConditionFix(t *testing.T) { + ctrl := gomock.NewController(t) + defer func() { + if ctrl != nil { + ctrl.Finish() + } + }() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineRaceConditionFix"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add container and populate with stats + engine.addAndStartStatsContainer("c1") + containers, exists := engine.tasksToContainers["t1"] + require.True(t, exists, "Task t1 should exist in tasksToContainers") + require.Len(t, containers, 1) + + statsContainer := containers["c1"] + require.NotNil(t, statsContainer) + + // Add some stats to the container + for _, fakeContainerStats := range createFakeContainerStats() { + statsContainer.statsQueue.add(fakeContainerStats) + } + + // Verify metrics can be collected normally + containerMetrics, err := engine.taskContainerMetricsUnsafe("t1") + require.NoError(t, err) + require.Len(t, containerMetrics, 1) + + // Cancel the container's context to simulate cleanup + statsContainer.StopStatsCollection() + + // Wait a bit for context cancellation to propagate + time.Sleep(100 * time.Millisecond) + + // Now try to collect metrics - should skip the cancelled container + // The implementation skips cancelled containers silently, so we should get + // empty metrics but no error since the container is still tracked + containerMetrics, err = engine.taskContainerMetricsUnsafe("t1") + // The actual implementation skips cancelled containers and returns empty metrics + require.NoError(t, err) + require.Empty(t, containerMetrics) +} + +// TestStatsEngineContainerLeakFix tests that containers that can't be resolved +// to tasks are still properly removed from all tracking maps. +func TestStatsEngineContainerLeakFix(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + t2 := &apitask.Task{Arn: "t2", Family: "f2", NetworkMode: "bridge"} + + // Set up expectations for adding containers - allow multiple calls since stats collection may call ResolveTask + resolver.EXPECT().ResolveTask("c1").Return(t1, nil).AnyTimes() + resolver.EXPECT().ResolveTask("c2").Return(t2, nil).AnyTimes() + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineContainerLeakFix"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add containers to different tasks + engine.addAndStartStatsContainer("c1") + engine.addAndStartStatsContainer("c2") + + // Verify both containers are tracked + require.Len(t, engine.tasksToContainers, 2) + containers1, exists1 := engine.tasksToContainers["t1"] + require.True(t, exists1) + require.Len(t, containers1, 1) + _, exists := containers1["c1"] + require.True(t, exists) + + containers2, exists2 := engine.tasksToContainers["t2"] + require.True(t, exists2) + require.Len(t, containers2, 1) + _, exists = containers2["c2"] + require.True(t, exists) + + // Remove c1 - should work normally since it can be resolved + engine.removeContainer("c1") + containers1, _ = engine.tasksToContainers["t1"] + _, exists = containers1["c1"] + require.False(t, exists, "Container c1 should be removed") + + // Remove c2 - this simulates the container leak scenario where + // the container can't be resolved to a task (Docker Task Engine cleaned it up) + // but it should still be removed from all tracking maps + engine.removeContainer("c2") + + // Verify c2 is removed from all maps even though resolution failed + containers2, exists2 = engine.tasksToContainers["t2"] + if exists2 { + _, exists = containers2["c2"] + require.False(t, exists, "Container c2 should be removed even when resolution fails") + } + + // Verify the container is not tracked anywhere + found := false + for taskArn, containers := range engine.tasksToContainers { + if _, exists := containers["c2"]; exists { + found = true + t.Errorf("Container c2 still found in task %s after removal", taskArn) + } + } + require.False(t, found, "Container c2 should not be found in any task") +} + +// TestStatsEngineOrphanedContainerCleanup tests the removeContainerFromAllTasksUnsafe +// method that handles containers that can't be resolved to their original task. +func TestStatsEngineOrphanedContainerCleanup(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + t2 := &apitask.Task{Arn: "t2", Family: "f2", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveTask("orphaned").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil) + resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "testContainer", + NetworkModeUnsafe: "bridge", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineOrphanedContainerCleanup"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add containers to different tasks + engine.addAndStartStatsContainer("c1") + engine.addAndStartStatsContainer("c2") + engine.addAndStartStatsContainer("orphaned") + + // Verify all containers are tracked + require.Len(t, engine.tasksToContainers, 2) + containers1, _ := engine.tasksToContainers["t1"] + require.Len(t, containers1, 2) // c1 and orphaned + containers2, _ := engine.tasksToContainers["t2"] + require.Len(t, containers2, 1) // c2 + + // Test the removeContainerFromAllTasksUnsafe method directly + engine.lock.Lock() + engine.removeContainerFromAllTasksUnsafe("orphaned") + engine.lock.Unlock() + + // Verify orphaned container is removed from t1 + containers1, _ = engine.tasksToContainers["t1"] + _, exists := containers1["orphaned"] + require.False(t, exists, "Orphaned container should be removed from t1") + + // Verify other containers are still present + _, exists = containers1["c1"] + require.True(t, exists, "Container c1 should still be present") + containers2, _ = engine.tasksToContainers["t2"] + _, exists = containers2["c2"] + require.True(t, exists, "Container c2 should still be present") +} + +// TestStatsEngineHealthCheckContainerCleanup tests that health check containers +// are also properly cleaned up when they become orphaned. +func TestStatsEngineHealthCheckContainerCleanup(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl) + + t1 := &apitask.Task{Arn: "t1", Family: "f1", NetworkMode: "bridge"} + + resolver.EXPECT().ResolveTask("health_container").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil) + resolver.EXPECT().ResolveContainer("health_container").AnyTimes().Return(&apicontainer.DockerContainer{ + Container: &apicontainer.Container{ + Name: "healthContainer", + NetworkModeUnsafe: "bridge", + HealthCheckType: "docker", + }, + }, nil) + + mockStatsChannel := make(chan *types.StatsJSON) + defer close(mockStatsChannel) + mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes() + + engine := NewDockerStatsEngine(&cfg, nil, eventStream("TestStatsEngineHealthCheckContainerCleanup"), nil, nil, nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + engine.ctx = ctx + engine.resolver = resolver + engine.client = mockDockerClient + engine.cluster = defaultCluster + engine.containerInstanceArn = defaultContainerInstance + engine.csiClient = csiclient.NewDummyCSIClient() + defer engine.removeAll() + + // Add health check container + engine.addAndStartStatsContainer("health_container") + + // Verify container is tracked in both regular and health check maps + require.Len(t, engine.tasksToContainers, 1) + require.Len(t, engine.tasksToHealthCheckContainers, 1) + + containers, _ := engine.tasksToContainers["t1"] + _, exists := containers["health_container"] + require.True(t, exists, "Health container should be in regular containers map") + + healthContainers, _ := engine.tasksToHealthCheckContainers["t1"] + _, exists = healthContainers["health_container"] + require.True(t, exists, "Health container should be in health check containers map") + + // Test cleanup of orphaned health check container + engine.lock.Lock() + engine.removeContainerFromAllTasksUnsafe("health_container") + engine.lock.Unlock() + + // Verify container is removed from both maps + containers, _ = engine.tasksToContainers["t1"] + _, exists = containers["health_container"] + require.False(t, exists, "Health container should be removed from regular containers map") + + healthContainers, _ = engine.tasksToHealthCheckContainers["t1"] + _, exists = healthContainers["health_container"] + require.False(t, exists, "Health container should be removed from health check containers map") +} diff --git a/agent/stats/engine_linux.go b/agent/stats/engine_unix.go similarity index 97% rename from agent/stats/engine_linux.go rename to agent/stats/engine_unix.go index 098d661a4be..4010b917373 100644 --- a/agent/stats/engine_linux.go +++ b/agent/stats/engine_unix.go @@ -1,5 +1,5 @@ -//go:build linux -// +build linux +//go:build !windows +// +build !windows // Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. //