Skip to content

Commit 3c6617b

Browse files
committed
add namespace watch
1 parent bc6d9a2 commit 3c6617b

File tree

5 files changed

+49
-23
lines changed

5 files changed

+49
-23
lines changed

pkg/runtime/cache/namespace.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type namespaceInfo struct {
3838
endpointURL string
3939
// {service}.services.k8s.aws/deletion-policy Annotations (keyed by service)
4040
deletionPolicies map[string]string
41+
42+
labels map[string]string
4143
}
4244

4345
// getDefaultRegion returns the default region value
@@ -226,6 +228,16 @@ func (c *NamespaceCache) GetDeletionPolicy(namespace string, service string) (st
226228
return "", false
227229
}
228230

231+
// GetDeletionPolicy returns the deletion policy if it exists
232+
func (c *NamespaceCache) GetLabels(namespace string) map[string]string {
233+
info, ok := c.getNamespaceInfo(namespace)
234+
if ok {
235+
return info.labels
236+
}
237+
return nil
238+
239+
}
240+
229241
// getNamespaceInfo reads a namespace cached annotations and
230242
// return a given namespace default aws region, owner account id and endpoint url.
231243
// This function is thread safe.
@@ -268,6 +280,8 @@ func (c *NamespaceCache) setNamespaceInfoFromK8sObject(ns *corev1.Namespace) {
268280
nsInfo.deletionPolicies[service] = elem
269281
}
270282

283+
nsInfo.labels = ns.GetLabels()
284+
271285
c.Lock()
272286
defer c.Unlock()
273287
c.namespaceInfos[ns.ObjectMeta.Name] = nsInfo

pkg/runtime/iamroleselector/cache.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package iamroleselector
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"sync"
1920

@@ -24,29 +25,33 @@ import (
2425
"k8s.io/apimachinery/pkg/runtime/schema"
2526
"k8s.io/client-go/dynamic"
2627
"k8s.io/client-go/dynamic/dynamicinformer"
28+
"k8s.io/client-go/kubernetes"
2729
"k8s.io/client-go/tools/cache"
2830

2931
ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
32+
ackcache "github.com/aws-controllers-k8s/runtime/pkg/runtime/cache"
3033
)
3134

3235
// Cache wraps the informer for IAMRoleSelector resources
3336
type Cache struct {
3437
sync.RWMutex
35-
log logr.Logger
36-
informer cache.SharedIndexInformer
37-
selectors map[string]*ackv1alpha1.IAMRoleSelector // name -> selector
38+
namespaces *ackcache.NamespaceCache
39+
log logr.Logger
40+
informer cache.SharedIndexInformer
41+
selectors map[string]*ackv1alpha1.IAMRoleSelector // name -> selector
3842
}
3943

4044
// NewCache creates a new IAMRoleSelector cache
4145
func NewCache(log logr.Logger) *Cache {
4246
return &Cache{
43-
log: log.WithName("cache.iam-role-selector"),
44-
selectors: make(map[string]*ackv1alpha1.IAMRoleSelector),
47+
log: log.WithName("cache.iam-role-selector"),
48+
selectors: make(map[string]*ackv1alpha1.IAMRoleSelector),
49+
namespaces: ackcache.NewNamespaceCache(log, nil, nil),
4550
}
4651
}
4752

4853
// Run starts the cache and blocks until stopCh is closed
49-
func (c *Cache) Run(client dynamic.Interface, stopCh <-chan struct{}) {
54+
func (c *Cache) Run(client dynamic.Interface, namespaceClient kubernetes.Interface, stopCh <-chan struct{}) {
5055
c.log.V(1).Info("Starting IAMRoleSelector cache")
5156

5257
// Create dynamic informer factory
@@ -74,6 +79,8 @@ func (c *Cache) Run(client dynamic.Interface, stopCh <-chan struct{}) {
7479
})
7580

7681
factory.Start(stopCh)
82+
83+
c.namespaces.Run(namespaceClient, stopCh)
7784
}
7885

7986
func (c *Cache) handleAdd(obj interface{}) {
@@ -197,15 +204,15 @@ func (c *Cache) ListSelectors() []*ackv1alpha1.IAMRoleSelector {
197204

198205
// Matches returns a list of IAMRoleSelectors that match the given resource. This function
199206
// should only be called after the cache has been started and synced.
200-
func (c *Cache) Matches(resource runtime.Object) ([]*ackv1alpha1.IAMRoleSelector, error) {
207+
func (c *Cache) Matches(ctx context.Context, resource runtime.Object) ([]*ackv1alpha1.IAMRoleSelector, error) {
201208
// Extract metadata from the resource
202209
metaObj, err := meta.Accessor(resource)
203210
if err != nil {
204211
return nil, fmt.Errorf("failed to get metadata from resource: %w", err)
205212
}
206213

207-
namespace := metaObj.GetNamespace()
208-
214+
namespaceName := metaObj.GetNamespace()
215+
namespaceLabels := c.namespaces.GetLabels(namespaceName)
209216
// Get GVK - should be set on ACK resources
210217
gvk := resource.GetObjectKind().GroupVersionKind()
211218
if gvk.Empty() {
@@ -215,5 +222,5 @@ func (c *Cache) Matches(resource runtime.Object) ([]*ackv1alpha1.IAMRoleSelector
215222

216223
// TODO: get namespace labels from a namespace lister/cache
217224
// For now, pass empty namespace labels
218-
return c.GetMatchingSelectors(namespace, nil, gvk)
225+
return c.GetMatchingSelectors(namespaceName, namespaceLabels, gvk)
219226
}

pkg/runtime/iamroleselector/cache_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package iamroleselector
1515

1616
import (
17+
"context"
1718
"testing"
1819
"time"
1920

@@ -27,6 +28,7 @@ import (
2728
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/watch"
2930
"k8s.io/client-go/dynamic/fake"
31+
k8sfake "k8s.io/client-go/kubernetes/fake"
3032
k8stesting "k8s.io/client-go/testing"
3133

3234
ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
@@ -53,13 +55,16 @@ func TestCache_Matches(t *testing.T) {
5355
client := fake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrToListKind)
5456
client.PrependWatchReactor("iamroleselectors", k8stesting.DefaultWatchReactor(watcher, nil))
5557

58+
k8sClient := k8sfake.NewSimpleClientset()
59+
k8sClient.PrependWatchReactor("production", k8stesting.DefaultWatchReactor(watcher, nil))
60+
5661
logger := zapr.NewLogger(zap.NewNop())
5762
cache := NewCache(logger)
5863

5964
stopCh := make(chan struct{})
6065
t.Cleanup(func() { close(stopCh) })
6166

62-
go cache.Run(client, stopCh)
67+
go cache.Run(client, k8sClient, stopCh)
6368

6469
// Wait for cache to sync
6570
require.Eventually(t, func() bool {
@@ -145,10 +150,11 @@ func TestCache_Matches(t *testing.T) {
145150
wantCount: 0,
146151
},
147152
}
153+
ctx := context.TODO()
148154

149155
for _, tt := range tests {
150156
t.Run(tt.name, func(t *testing.T) {
151-
matches, err := cache.Matches(tt.resource)
157+
matches, err := cache.Matches(ctx, tt.resource)
152158
require.NoError(t, err)
153159
require.Len(t, matches, tt.wantCount)
154160

pkg/runtime/reconciler.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,9 @@ func (r *resourceReconciler) Reconcile(ctx context.Context, req ctrlrt.Request)
266266
// If the IAMRoleSelector feature gate is enabled, we need to check if there
267267
// are any matching IAMRoleSelectors for this resource. If there are, we
268268
// override the roleARN from CARM (if any) with the one from the selector.
269-
selectors, err := r.irsCache.GetMatchingSelectors(
270-
req.Namespace,
271-
nil,
272-
r.rd.GroupVersionKind(),
269+
selectors, err := r.irsCache.Matches(
270+
ctx,
271+
desired.RuntimeObject(),
273272
)
274273
if err != nil {
275274
return ctrlrt.Result{}, fmt.Errorf("checking for matching IAMRoleSelectors: %w", err)

pkg/runtime/service_controller.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,18 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg
213213
}},
214214
cfg.FeatureGates,
215215
)
216+
clusterConfig := mgr.GetConfig()
217+
clientSet, err := kubernetes.NewForConfig(clusterConfig)
218+
if err != nil {
219+
return err
220+
}
216221
// The caches are only used for cross account resource management. We
217222
// want to run them only when --enable-carm is set to true and
218223
// --watch-namespace is set to zero or more than one namespaces.
219224
if cfg.EnableCARM {
220225
if len(namespaces) == 1 {
221226
c.log.V(0).Info("--enable-carm is set to true but --watch-namespace is set to a single namespace. CARM will not be enabled.")
222227
} else {
223-
clusterConfig := mgr.GetConfig()
224-
clientSet, err := kubernetes.NewForConfig(clusterConfig)
225-
if err != nil {
226-
return err
227-
}
228228
// Run the caches. This will not block as the caches are run in
229229
// separate goroutines.
230230
carmCache.Run(clientSet)
@@ -268,11 +268,11 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg
268268
if cfg.FeatureGates.IsEnabled(featuregate.IAMRoleSelector) {
269269
// init dynamic client
270270
clusterConfig := mgr.GetConfig()
271-
clientSet, err := dynamic.NewForConfig(clusterConfig)
271+
dynamicClient, err := dynamic.NewForConfig(clusterConfig)
272272
if err != nil {
273273
return err
274274
}
275-
irsCache.Run(clientSet, context.TODO().Done())
275+
irsCache.Run(dynamicClient, clientSet, context.TODO().Done())
276276
}
277277

278278
// Filter the resource manager factories

0 commit comments

Comments
 (0)