Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions cmd/reconciler-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"

"github.com/go-logr/logr"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/klog/v2/textlogger"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/core"
Expand All @@ -36,6 +38,7 @@ import (
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
"kpt.dev/configsync/pkg/util/customresource"
"kpt.dev/configsync/pkg/util/log"
utilwatch "kpt.dev/configsync/pkg/util/watch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
// +kubebuilder:scaffold:imports
Expand Down Expand Up @@ -71,8 +74,19 @@ func main() {
setupLog.Info(fmt.Sprintf("running with flags --cluster-name=%s; --reconciler-polling-period=%s; --hydration-polling-period=%s",
*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
cfg := ctrl.GetConfigOrDie()

mapper, err := utilwatch.ReplaceOnResetRESTMapperFromConfig(cfg)
if err != nil {
setupLog.Error(err, "failed to create resettable rest mapper")
os.Exit(1)
}

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: core.Scheme,
MapperProvider: func(_ *rest.Config, _ *http.Client) (meta.RESTMapper, error) {
return mapper, nil
},
})
if err != nil {
setupLog.Error(err, "failed to start manager")
Expand All @@ -82,24 +96,25 @@ func main() {
// support the Watch method. So build another with shared config.
// This one can be used for watching and bypassing the cache, as needed.
// Use with discretion.
watcher, err := client.NewWithWatch(mgr.GetConfig(), client.Options{
watcher, err := client.NewWithWatch(cfg, client.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
Mapper: mapper,
})
if err != nil {
setupLog.Error(err, "failed to create watching client")
os.Exit(1)
}
dynamicClient, err := dynamic.NewForConfig(mgr.GetConfig())
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "failed to build dynamic client")
os.Exit(1)
}
watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mgr.GetRESTMapper(), &setupLog)
watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mapper, &setupLog)

crdController := &controllers.CRDController{}
crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(),
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
crdControllerLogger := textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")
crdMetaController := controllers.NewCRDMetaController(crdController,
mgr.GetCache(), mapper, crdControllerLogger)
if err := crdMetaController.Register(mgr); err != nil {
setupLog.Error(err, "failed to register controller", "controller", "CRD")
os.Exit(1)
Expand Down
30 changes: 17 additions & 13 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
Expand All @@ -46,10 +47,10 @@ import (
"kpt.dev/configsync/pkg/syncer/reconcile"
"kpt.dev/configsync/pkg/syncer/reconcile/fight"
"kpt.dev/configsync/pkg/util"
utilwatch "kpt.dev/configsync/pkg/util/watch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)

Expand Down Expand Up @@ -166,15 +167,9 @@ func Run(opts Options) {
klog.Fatalf("Error creating discovery client: %v", err)
}

// Use the DynamicRESTMapper as the default RESTMapper does not detect when
// new types become available
httpClient, err := rest.HTTPClientFor(cfg)
mapper, err := utilwatch.ReplaceOnResetRESTMapperFromConfig(cfg)
if err != nil {
klog.Fatalf("Error creating HTTPClient: %v", err)
}
mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
klog.Fatalf("Error creating DynamicRESTMapper: %v", err)
klog.Fatalf("Error creating resettable rest mapper: %v", err)
}

cl, err := client.New(cfg, client.Options{
Expand Down Expand Up @@ -219,12 +214,20 @@ func Run(opts Options) {
if err != nil {
klog.Fatalf("Error creating rest config for the remediator: %v", err)
}

dynamicClient, err := dynamic.NewForConfig(cfgForWatch)
if err != nil {
klog.Fatalf("Error creating DynamicClient for the remediator: %v", err)
}
lwFactory := &watch.DynamicListerWatcherFactory{
DynamicClient: dynamicClient,
Mapper: mapper,
}
watcherFactory := watch.WatcherFactoryFromListerWatcherFactory(lwFactory.ListerWatcher)
crdController := &controllers.CRDController{}
conflictHandler := conflict.NewHandler()
fightHandler := fight.NewHandler()

rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers)
rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, watcherFactory, mapper, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers)
if err != nil {
klog.Fatalf("Instantiating Remediator: %v", err)
}
Expand Down Expand Up @@ -333,8 +336,9 @@ func Run(opts Options) {
klog.Fatalf("Instantiating Controller Manager: %v", err)
}

crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(),
textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD"))
crdControllerLogger := textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")
crdMetaController := controllers.NewCRDMetaController(crdController,
mgr.GetCache(), mapper, crdControllerLogger)
if err := crdMetaController.Register(mgr); err != nil {
klog.Fatalf("Instantiating CRD Controller: %v", err)
}
Expand Down
63 changes: 61 additions & 2 deletions pkg/reconcilermanager/controllers/crd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/util/customresource"
utilwatch "kpt.dev/configsync/pkg/util/watch"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -93,26 +96,36 @@ func (s *CRDController) getReconciler(gk schema.GroupKind) CRDReconcileFunc {
type CRDMetaController struct {
loggingController
cache cache.Cache
mapper utilwatch.ResettableRESTMapper
delegate *CRDController
observedResources map[schema.GroupResource]schema.GroupKind
}

var _ reconcile.Reconciler = &CRDMetaController{}

// NewCRDMetaController constructs a new CRDMetaController.
func NewCRDMetaController(delegate *CRDController, cache cache.Cache, log logr.Logger) *CRDMetaController {
func NewCRDMetaController(
delegate *CRDController,
cache cache.Cache,
mapper utilwatch.ResettableRESTMapper,
log logr.Logger,
) *CRDMetaController {
return &CRDMetaController{
loggingController: loggingController{
log: log,
},
cache: cache,
mapper: mapper,
delegate: delegate,
observedResources: make(map[schema.GroupResource]schema.GroupKind),
}
}

// Reconcile checks is the CRD exists and delegates to the CRDController to
// Reconcile checks if the CRD exists and delegates to the CRDController to
// reconcile the update.
//
// Reconcile also handles auto-discovery and auto-invalidation of custom
// resources by calling Reset on the RESTMapper, as needed.
func (r *CRDMetaController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
crdName := req.Name
ctx = r.setLoggerValues(ctx, "crd", crdName)
Expand Down Expand Up @@ -156,6 +169,18 @@ func (r *CRDMetaController) Reconcile(ctx context.Context, req reconcile.Request

r.logger(ctx).Info("CRDMetaController handling CRD status update", "name", crdName)

if customresource.IsEstablished(crdObj) {
if err := discoverResourceForKind(r.mapper, kind); err != nil {
// Retry with backoff
return reconcile.Result{}, err
}
} else {
if err := forgetResourceForKind(r.mapper, kind); err != nil {
// Retry with backoff
return reconcile.Result{}, err
}
}

if err := r.delegate.Reconcile(ctx, kind, crdObj); err != nil {
// Retry with backoff
return reconcile.Result{}, err
Expand All @@ -172,3 +197,37 @@ func (r *CRDMetaController) Register(mgr controllerruntime.Manager) error {
For(&apiextensionsv1.CustomResourceDefinition{}).
Complete(r)
}

// discoverResourceForKind resets the RESTMapper if needed, to discover the
// resource that maps to the specified kind.
func discoverResourceForKind(mapper utilwatch.ResettableRESTMapper, gk schema.GroupKind) error {
if _, err := mapper.RESTMapping(gk); err != nil {
if meta.IsNoMatchError(err) {
klog.Infof("Remediator resetting RESTMapper to discover resource: %v", gk)
if err := mapper.Reset(); err != nil {
return fmt.Errorf("remediator failed to reset RESTMapper: %w", err)
}
} else {
return fmt.Errorf("remediator failed to map kind to resource: %w", err)
}
}
// Else, mapper already up to date
return nil
}

// forgetResourceForKind resets the RESTMapper if needed, to forget the resource
// that maps to the specified kind.
func forgetResourceForKind(mapper utilwatch.ResettableRESTMapper, gk schema.GroupKind) error {
if _, err := mapper.RESTMapping(gk); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("remediator failed to map kind to resource: %w", err)
}
// Else, mapper already up to date
} else {
klog.Infof("Remediator resetting RESTMapper to forget resource: %v", gk)
if err := mapper.Reset(); err != nil {
return fmt.Errorf("remediator failed to reset RESTMapper: %w", err)
}
}
return nil
}
7 changes: 4 additions & 3 deletions pkg/remediator/remediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/reconcilermanager/controllers"
Expand All @@ -31,6 +30,7 @@ import (
"kpt.dev/configsync/pkg/status"
syncerreconcile "kpt.dev/configsync/pkg/syncer/reconcile"
"kpt.dev/configsync/pkg/syncer/reconcile/fight"
utilwatch "kpt.dev/configsync/pkg/util/watch"
)

// Remediator knows how to keep the state of a Kubernetes cluster in sync with
Expand Down Expand Up @@ -98,7 +98,8 @@ var _ Interface = &Remediator{}
func New(
scope declared.Scope,
syncName string,
cfg *rest.Config,
watcherFactory watch.WatcherFactory,
mapper utilwatch.ResettableRESTMapper,
applier syncerreconcile.Applier,
conflictHandler conflict.Handler,
fightHandler fight.Handler,
Expand All @@ -119,7 +120,7 @@ func New(
conflictHandler: conflictHandler,
}

watchMgr, err := watch.NewManager(scope, syncName, cfg, q, decls, nil, conflictHandler, crdController)
watchMgr, err := watch.NewManager(scope, syncName, q, decls, watcherFactory, mapper, conflictHandler, crdController)
if err != nil {
return nil, fmt.Errorf("creating watch manager: %w", err)
}
Expand Down
36 changes: 2 additions & 34 deletions pkg/remediator/watch/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/kinds"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
utilwatch "kpt.dev/configsync/pkg/util/watch"
)

// Lister is any object that performs listing of a resource.
Expand Down Expand Up @@ -64,7 +63,7 @@ type ListerWatcherFactory func(gvk schema.GroupVersionKind, namespace string) Li
// ResettableRESTMapper.
type DynamicListerWatcherFactory struct {
DynamicClient *dynamic.DynamicClient
Mapper ResettableRESTMapper
Mapper utilwatch.ResettableRESTMapper
}

// ListerWatcher constructs a ListerWatcher for the specified GroupVersionKind
Expand All @@ -73,37 +72,6 @@ func (dlwf *DynamicListerWatcherFactory) ListerWatcher(gvk schema.GroupVersionKi
return NewListWatchFromClient(dlwf.DynamicClient, dlwf.Mapper, gvk, namespace)
}

// DynamicListerWatcherFactoryFromConfig constructs a DynamicListerWatcherFactory
func DynamicListerWatcherFactoryFromConfig(cfg *rest.Config) (*DynamicListerWatcherFactory, error) {
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("creating DynamicClient: %w", err)
}
httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, fmt.Errorf("creating HTTPClient: %w", err)
}
mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, fmt.Errorf("creating DynamicRESTMapper: %w", err)
}
// DynamicRESTMapper dynamically and transparently discovers new resources,
// when a NoMatchFound error is encountered, but it doesn't automatically
// invalidate deleted resources. So use ReplaceOnResetRESTMapper to replace
// the whole DynamicRESTMapper when Reset is called.
newMapperFn := func() (meta.RESTMapper, error) {
m, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, fmt.Errorf("creating DynamicRESTMapper: %w", err)
}
return m, nil
}
return &DynamicListerWatcherFactory{
DynamicClient: dynamicClient,
Mapper: NewReplaceOnResetRESTMapper(mapper, newMapperFn),
}, nil
}

// ListFunc knows how to list resources.
type ListFunc func(ctx context.Context, options metav1.ListOptions) (*unstructured.UnstructuredList, error)

Expand Down
Loading