Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 96 additions & 13 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,17 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) {
engine.lock.Lock()
defer engine.lock.Unlock()

// Make sure that this container belongs to a task.
// Try to resolve the container to get the task ARN for proper cleanup
task, err := engine.resolver.ResolveTask(dockerID)
if err != nil {
seelog.Debugf("Could not map container to task, ignoring, err: %v, id: %s", err, dockerID)
// If we can't resolve the container to a task (e.g., because the Docker Task Engine
// has already cleaned it up), we need to search through all tasks to find and remove
// this container to prevent it from being stuck in the stats engine forever.
logger.Debug("Could not map container to task, searching all tasks for cleanup", logger.Fields{
field.Container: dockerID,
field.Error: err,
})
engine.removeContainerFromAllTasksUnsafe(dockerID)
return
}

Expand All @@ -794,6 +801,48 @@ func (engine *DockerStatsEngine) removeContainer(dockerID string) {
engine.doRemoveContainerUnsafe(container, task.Arn)
}

// removeContainerFromAllTasksUnsafe searches through all tasks to find and remove a container
// when we can't resolve the container to a specific task. This prevents container leaks
// when the Docker Task Engine has already cleaned up the container but the stats engine
// still has it tracked.
func (engine *DockerStatsEngine) removeContainerFromAllTasksUnsafe(dockerID string) {
var foundContainer *StatsContainer
var foundTaskArn string

// Search through all tasks to find the container
for taskArn, containerMap := range engine.tasksToContainers {
if container, exists := containerMap[dockerID]; exists {
foundContainer = container
foundTaskArn = taskArn
break
}
}

if foundContainer != nil {
logger.Info("Found orphaned container in stats engine, removing", logger.Fields{
field.Container: dockerID,
field.TaskARN: foundTaskArn,
})
engine.doRemoveContainerUnsafe(foundContainer, foundTaskArn)
} else {
// Also check health check containers
for taskArn, containerMap := range engine.tasksToHealthCheckContainers {
if container, exists := containerMap[dockerID]; exists {
logger.Info("Found orphaned health check container in stats engine, removing", logger.Fields{
field.Container: dockerID,
field.TaskARN: taskArn,
})
delete(engine.tasksToHealthCheckContainers[taskArn], dockerID)
if len(engine.tasksToHealthCheckContainers[taskArn]) == 0 {
delete(engine.tasksToHealthCheckContainers, taskArn)
}
container.StopStatsCollection()
break
}
}
}
}

// newDockerContainerMetadataResolver returns a new instance of DockerContainerMetadataResolver.
func newDockerContainerMetadataResolver(taskEngine ecsengine.TaskEngine) (*DockerContainerMetadataResolver, error) {
dockerTaskEngine, ok := taskEngine.(*ecsengine.DockerTaskEngine)
Expand Down Expand Up @@ -821,13 +870,29 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]*
var containerMetrics []*ecstcs.ContainerMetric
for _, container := range containerMap {
dockerID := container.containerMetadata.DockerID
// Check if the container is terminal. If it is, make sure that it is
// Validate container exists in current task map
if _, exists := containerMap[dockerID]; !exists {
continue
}
// cleaned up properly. We might sometimes miss events from docker task
// engine and this helps in reconciling the state. The tcs client's
// GetInstanceMetrics probe is used as the trigger for this.
if engine.stopTrackingContainerUnsafe(container, taskArn) {
continue
}

// Check if container is being cleaned up by verifying if context is cancelled
select {
case <-container.ctx.Done():
// Container is being cleaned up, skip metrics collection
logger.Debug("Container context cancelled, skipping metrics collection", logger.Fields{
field.Container: dockerID,
})
continue
default:
// Container context is still active, proceed with metrics collection
}

// age is used to determine if we should or should not expect missing metrics.
// this is because recently-started containers would normally not have their metrics
// queue filled yet.
Expand All @@ -842,22 +907,40 @@ func (engine *DockerStatsEngine) taskContainerMetricsUnsafe(taskArn string) ([]*
if age < gracePeriod {
continue
}
logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
field.Container: dockerID,
field.Error: err,
})
continue
// Check again if container is being cleaned up before logging error
select {
case <-container.ctx.Done():
logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{
field.Container: dockerID,
})
continue
default:
logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
field.Container: dockerID,
field.Error: err,
})
continue
}
}
memoryStatsSet, err := container.statsQueue.GetMemoryStatsSet()
if err != nil {
if age < gracePeriod {
continue
}
logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
field.Container: dockerID,
field.Error: err,
})
continue
// Check again if container is being cleaned up before logging error
select {
case <-container.ctx.Done():
logger.Debug("Container context cancelled during metrics collection, skipping error log", logger.Fields{
field.Container: dockerID,
})
continue
default:
logger.Error("Error collecting cloudwatch metrics for container", logger.Fields{
field.Container: dockerID,
field.Error: err,
})
continue
}
}

containerMetric := &ecstcs.ContainerMetric{
Expand Down
Loading
Loading