Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions pkg/project/auth/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -107,6 +108,41 @@ func (rs *statelessSkipSynchronizer) SkipSynchronize(prevState string, versioned
return skip, currentState
}

func newOverridableSkipSynchronizer() *overridableSkipSynchronizer {
return &overridableSkipSynchronizer{
skipSynchronizer: &statelessSkipSynchronizer{},
}
}

var (
bTrue = true
bFalse = false
pTrue = &bTrue
pFalse = &bFalse
)

// overridableSkipSynchronizer wraps a base skipSynchronizer with the ability to
// override the next SkipSynchronize call.
type overridableSkipSynchronizer struct {
override atomic.Pointer[bool]
skipSynchronizer
}

func (o *overridableSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (bool, string) {
if override := o.override.Swap(nil); override != nil {
return *override, prevState
}
return o.skipSynchronizer.SkipSynchronize(prevState, versionedObjects...)
}

func (o *overridableSkipSynchronizer) ForceDoNotSkip() {
o.override.Store(pFalse)
}

func (o *overridableSkipSynchronizer) ForceSkip() {
o.override.Store(pTrue)
}

type neverSkipSynchronizer struct{}

func (s *neverSkipSynchronizer) SkipSynchronize(prevState string, versionedObjects ...LastSyncResourceVersioner) (bool, string) {
Expand Down Expand Up @@ -244,15 +280,21 @@ func NewAuthorizationCache(

watchers: []CacheWatcher{},
}
skipSyncer := newOverridableSkipSynchronizer()
if _, err := informers.Roles().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: func(obj interface{}) { skipSyncer.ForceDoNotSkip() }}); err != nil {
utilruntime.HandleError(err)
}
if _, err := informers.RoleBindings().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: func(obj interface{}) { skipSyncer.ForceDoNotSkip() }}); err != nil {
utilruntime.HandleError(err)
}
ac.skip = skipSyncer
ac.lastSyncResourceVersioner = namespaceLastSyncResourceVersioner
ac.syncHandler = ac.syncRequest
return ac
}

// Run begins watching and synchronizing the cache
func (ac *AuthorizationCache) Run(period time.Duration) {
ac.skip = &statelessSkipSynchronizer{}

go utilwait.Forever(func() { ac.synchronize() }, period)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/project/auth/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (mr *mockReviewer) Review(name string) (Review, error) {
}

func validateList(t *testing.T, lister Lister, user user.Info, expectedSet sets.String) {
t.Helper()
namespaceList, err := lister.List(user, labels.Everything())
if err != nil {
t.Errorf("Unexpected error %v", err)
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestSyncNamespace(t *testing.T) {
reviewer,
informers.Rbac().V1(),
)
authorizationCache.skip = &neverSkipSynchronizer{}
// we prime the data we need here since we are not running reflectors
for i := range namespaceList.Items {
nsIndexer.Add(&namespaceList.Items[i])
Expand Down