19
19
from botocore .exceptions import ClientError
20
20
from cfct .utils .retry_decorator import try_except_retry
21
21
from cfct .aws .utils .boto3_session import Boto3Session
22
+ from cfct .types import StackSetInstanceTypeDef , StackSetRequestTypeDef , ResourcePropertiesTypeDef
22
23
import json
23
24
25
+ from typing import Dict , List , Any
26
+
24
27
25
28
class StackSet (Boto3Session ):
29
+ DEPLOYED_BY_CFCT_TAG = {"Key" : "AWS_Solutions" , "Value" : "CustomControlTowerStackSet" }
30
+ CFCT_STACK_SET_PREFIX = "CustomControlTower-"
31
+ DEPLOY_METHOD = "stack_set"
32
+
26
33
def __init__ (self , logger , ** kwargs ):
27
34
self .logger = logger
28
35
__service_name = 'cloudformation'
@@ -35,6 +42,7 @@ def __init__(self, logger, **kwargs):
35
42
self .max_results_per_page = 20
36
43
super ().__init__ (logger , __service_name , ** kwargs )
37
44
self .cfn_client = super ().get_client ()
45
+
38
46
self .operation_in_progress_except_msg = \
39
47
'Caught exception OperationInProgressException' \
40
48
' handling the exception...'
@@ -358,6 +366,91 @@ def list_stack_set_operations(self, **kwargs):
358
366
self .logger .log_unhandled_exception (e )
359
367
raise
360
368
369
+ def _filter_managed_stack_set_names (self , list_stackset_response : Dict [str , Any ]) -> List [str ]:
370
+ """
371
+ Reduces a list of given stackset summaries to only those considered managed by CfCT
372
+ """
373
+ managed_stack_set_names : List [str ] = []
374
+ for summary in list_stackset_response ['Summaries' ]:
375
+ stack_set_name = summary ['StackSetName' ]
376
+ try :
377
+ response : Dict [str , Any ] = self .cfn_client .describe_stack_set (StackSetName = stack_set_name )
378
+ except ClientError as error :
379
+ if error .response ['Error' ]['Code' ] == "StackSetNotFoundException" :
380
+ continue
381
+ raise
382
+
383
+ if self .is_managed_by_cfct (describe_stackset_response = response ):
384
+ managed_stack_set_names .append (stack_set_name )
385
+
386
+ return managed_stack_set_names
387
+
388
+ def get_managed_stack_set_names (self ) -> List [str ]:
389
+ """
390
+ Discovers all StackSets prefixed with 'CustomControlTower-' and that
391
+ have the tag {Key: AWS_Solutions, Value: CustomControlTowerStackSet}
392
+ """
393
+
394
+ managed_stackset_names : List [str ] = []
395
+ paginator = self .cfn_client .get_paginator ("list_stack_sets" )
396
+ for page in paginator .paginate (Status = "ACTIVE" ):
397
+ managed_stackset_names .extend (self ._filter_managed_stack_set_names (list_stackset_response = page ))
398
+ return managed_stackset_names
399
+
400
+ def is_managed_by_cfct (self , describe_stackset_response : Dict [str , Any ]) -> bool :
401
+ """
402
+ A StackSet is considered managed if it has both the prefix we expect, and the proper tag
403
+ """
404
+
405
+ has_tag = StackSet .DEPLOYED_BY_CFCT_TAG in describe_stackset_response ['StackSet' ]['Tags' ]
406
+ has_prefix = describe_stackset_response ['StackSet' ]['StackSetName' ].startswith (StackSet .CFCT_STACK_SET_PREFIX )
407
+ is_active = describe_stackset_response ['StackSet' ]['Status' ] == "ACTIVE"
408
+ return all ((has_prefix , has_tag , is_active ))
409
+
410
+ def get_stack_sets_not_present_in_manifest (self , manifest_stack_sets : List [str ]) -> List [str ]:
411
+ """
412
+ Compares list of stacksets defined in the manifest versus the stacksets in the account
413
+ and returns a list of all stackset names to be deleted
414
+ """
415
+
416
+ # Stack sets defined in the manifest will not have the CFCT_STACK_SET_PREFIX
417
+ # To make comparisons simpler
418
+ manifest_stack_sets_with_prefix = [f"{ StackSet .CFCT_STACK_SET_PREFIX } { name } " for name in manifest_stack_sets ]
419
+ cfct_deployed_stack_sets = self .get_managed_stack_set_names ()
420
+ return list (set (cfct_deployed_stack_sets ).difference (set (manifest_stack_sets_with_prefix )))
421
+
422
+ def generate_delete_request (self , stacksets_to_delete : List [str ]) -> List [StackSetRequestTypeDef ]:
423
+ requests : List [StackSetRequestTypeDef ] = []
424
+ for stackset_name in stacksets_to_delete :
425
+ deployed_instances = self ._get_stackset_instances (stackset_name = stackset_name )
426
+ requests .append (StackSetRequestTypeDef (
427
+ RequestType = "Delete" ,
428
+ ResourceProperties = ResourcePropertiesTypeDef (
429
+ StackSetName = stackset_name ,
430
+ TemplateURL = "DeleteStackSetNoopURL" ,
431
+ Capabilities = json .dumps (["CAPABILITY_NAMED_IAM" , "CAPABILITY_AUTO_EXPAND" ]),
432
+ Parameters = {},
433
+ AccountList = list ({instance ['account' ] for instance in deployed_instances }),
434
+ RegionList = list ({instance ['region' ] for instance in deployed_instances }),
435
+ SSMParameters = {}
436
+ ),
437
+ SkipUpdateStackSet = "yes" ,
438
+ ))
439
+ return requests
440
+
441
+
442
+ def _get_stackset_instances (self , stackset_name : str ) -> List [StackSetInstanceTypeDef ]:
443
+ instance_regions_and_accounts : List [StackSetInstanceTypeDef ] = []
444
+ paginator = self .cfn_client .get_paginator ("list_stack_instances" )
445
+ for page in paginator .paginate (StackSetName = stackset_name ):
446
+ for summary in page ['Summaries' ]:
447
+ instance_regions_and_accounts .append (StackSetInstanceTypeDef (
448
+ account = summary ['Account' ],
449
+ region = summary ['Region' ],
450
+ ))
451
+
452
+ return instance_regions_and_accounts
453
+
361
454
362
455
class Stacks (Boto3Session ):
363
456
def __init__ (self , logger , region , ** kwargs ):
0 commit comments