-
Notifications
You must be signed in to change notification settings - Fork 5
Add DiscoveryKit features for Argo Rollout kinds. #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,6 +143,10 @@ spec: | |
| {{- end }} | ||
| - name: STEADYBIT_EXTENSION_DISCOVERY_MAX_POD_COUNT | ||
| value: "{{ .Values.discovery.maxPodCount }}" | ||
| {{- if .Values.discovery.disableArgoRollouts }} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test to |
||
| - name: STEADYBIT_EXTENSION_DISCOVERY_DISABLE_ARGO_ROLLOUTS | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The env var is |
||
| value: "true" | ||
| {{- end }} | ||
| - name: STEADYBIT_EXTENSION_DISCOVERY_REFRESH_THROTTLE | ||
| value: "{{ .Values.discovery.refreshThrottle }}" | ||
| {{- with .Values.extraEnvFrom }} | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,6 +169,8 @@ discovery: | |
| disableAdvice: false | ||
| # discovery.maxPodCount -- Skip listing pods, containers and hosts for deployments, statefulsets, etc. if there are more then the given pods. | ||
| maxPodCount: 50 | ||
| # discovery.disableArgoRollouts -- Disable discovery of Argo rollouts (requires argoproj.io rollout CRDs) | ||
| disableArgoRollouts: true | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 195ff has already some values to disable discoveries |
||
| # discovery.refreshThrottle -- Number of seconds between successive refreshes of the target data. | ||
| refreshThrottle: 20 | ||
| attributes: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,14 @@ import ( | |
| "errors" | ||
| "flag" | ||
| "fmt" | ||
| "path/filepath" | ||
| "sort" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/google/uuid" | ||
|
|
||
| "github.com/rs/zerolog/log" | ||
| "github.com/steadybit/extension-kubernetes/v2/extconfig" | ||
| "golang.org/x/exp/slices" | ||
|
|
@@ -19,8 +26,12 @@ import ( | |
| networkingv1 "k8s.io/api/networking/v1" | ||
| k8sErrors "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
| "k8s.io/apimachinery/pkg/labels" | ||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||
| "k8s.io/apimachinery/pkg/util/runtime" | ||
| "k8s.io/client-go/dynamic" | ||
| "k8s.io/client-go/dynamic/dynamicinformer" | ||
| "k8s.io/client-go/informers" | ||
| "k8s.io/client-go/kubernetes" | ||
| networkingv1client "k8s.io/client-go/kubernetes/typed/networking/v1" | ||
|
|
@@ -33,11 +44,6 @@ import ( | |
| "k8s.io/client-go/tools/clientcmd" | ||
| "k8s.io/client-go/tools/remotecommand" | ||
| "k8s.io/client-go/util/homedir" | ||
| "path/filepath" | ||
| "sort" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
| ) | ||
|
|
||
| var K8S *Client | ||
|
|
@@ -46,6 +52,11 @@ type Client struct { | |
| Distribution string | ||
| permissions *PermissionCheckResult | ||
|
|
||
| argoRollout struct { | ||
| lister RolloutLister | ||
| informer cache.SharedIndexInformer | ||
| } | ||
|
|
||
| daemonSet struct { | ||
| lister listerAppsv1.DaemonSetLister | ||
| informer cache.SharedIndexInformer | ||
|
|
@@ -114,6 +125,73 @@ type Client struct { | |
| clientset kubernetes.Interface | ||
| } | ||
|
|
||
| type RolloutLister interface { | ||
| List(selector labels.Selector) ([]*unstructured.Unstructured, error) | ||
| Rollouts(namespace string) RolloutNamespaceLister | ||
| } | ||
|
|
||
| type RolloutNamespaceLister interface { | ||
| List(selector labels.Selector) ([]*unstructured.Unstructured, error) | ||
| Get(name string) (*unstructured.Unstructured, error) | ||
| } | ||
|
|
||
| type rolloutLister struct { | ||
| indexer cache.Indexer | ||
| } | ||
|
|
||
| func (l *rolloutLister) List(selector labels.Selector) ([]*unstructured.Unstructured, error) { | ||
| items := l.indexer.List() | ||
| result := make([]*unstructured.Unstructured, 0, len(items)) | ||
| for _, item := range items { | ||
| if obj, ok := item.(*unstructured.Unstructured); ok { | ||
| result = append(result, obj) | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| func (l *rolloutLister) Rollouts(namespace string) RolloutNamespaceLister { | ||
| return &rolloutNamespaceLister{ | ||
| indexer: l.indexer, | ||
| namespace: namespace, | ||
| } | ||
| } | ||
|
|
||
| type rolloutNamespaceLister struct { | ||
| indexer cache.Indexer | ||
| namespace string | ||
| } | ||
|
|
||
| func (l *rolloutNamespaceLister) List(selector labels.Selector) ([]*unstructured.Unstructured, error) { | ||
| items, err := l.indexer.ByIndex(cache.NamespaceIndex, l.namespace) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| result := make([]*unstructured.Unstructured, 0, len(items)) | ||
| for _, item := range items { | ||
| if obj, ok := item.(*unstructured.Unstructured); ok { | ||
| result = append(result, obj) | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| func (l *rolloutNamespaceLister) Get(name string) (*unstructured.Unstructured, error) { | ||
| obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if !exists { | ||
| return nil, k8sErrors.NewNotFound(schema.GroupResource{Group: "argoproj.io", Resource: "rollouts"}, name) | ||
| } | ||
|
|
||
| if rolloutObj, ok := obj.(*unstructured.Unstructured); ok { | ||
| return rolloutObj, nil | ||
| } else { | ||
| return nil, fmt.Errorf("expected *unstructured.Unstructured, got %T", obj) | ||
| } | ||
| } | ||
|
|
||
| func (c *Client) Permissions() *PermissionCheckResult { | ||
| return c.permissions | ||
| } | ||
|
|
@@ -175,12 +253,12 @@ func (c *Client) PodsByLabelSelector(labelSelector *metav1.LabelSelector, namesp | |
| // ExecInPod executes a command in a pod container and returns the output | ||
| func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerName string, command []string) (string, error) { | ||
| config := c.GetConfig() | ||
|
|
||
| // Check if we have a valid REST config | ||
| if config.Host == "" { | ||
| return "", fmt.Errorf("no valid Kubernetes REST config available - cannot execute pod commands") | ||
| } | ||
|
|
||
| req := c.clientset.CoreV1().RESTClient().Post(). | ||
| Resource("pods"). | ||
| Name(podName). | ||
|
|
@@ -190,7 +268,7 @@ func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerNam | |
| Param("command", command[0]). | ||
| Param("stdout", "true"). | ||
| Param("stderr", "true") | ||
|
|
||
| // Add additional command arguments | ||
| for _, arg := range command[1:] { | ||
| req.Param("command", arg) | ||
|
|
@@ -206,11 +284,11 @@ func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerNam | |
| Stdout: &stdout, | ||
| Stderr: &stderr, | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", fmt.Errorf("error executing command: %w, stderr: %s", err, stderr.String()) | ||
| } | ||
|
|
||
| return stdout.String(), nil | ||
| } | ||
|
|
||
|
|
@@ -259,6 +337,25 @@ func (c *Client) DeploymentByNamespaceAndName(namespace string, name string) *ap | |
| return item | ||
| } | ||
|
|
||
| func (c *Client) ArgoRollouts() []*unstructured.Unstructured { | ||
| if extconfig.HasNamespaceFilter() { | ||
| log.Info().Msgf("Fetching Argo Rollouts for namespace %s", extconfig.Config.Namespace) | ||
| rollouts, err := c.argoRollout.lister.Rollouts(extconfig.Config.Namespace).List(labels.Everything()) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("Error while fetching Argo Rollouts") | ||
| return []*unstructured.Unstructured{} | ||
| } | ||
| return rollouts | ||
| } else { | ||
| rollouts, err := c.argoRollout.lister.List(labels.Everything()) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("Error while fetching Argo Rollouts") | ||
| return []*unstructured.Unstructured{} | ||
| } | ||
| return rollouts | ||
| } | ||
| } | ||
|
|
||
| func (c *Client) ServicesByPod(pod *corev1.Pod) []*corev1.Service { | ||
| services, err := c.service.lister.Services(pod.Namespace).List(labels.Everything()) | ||
| if err != nil { | ||
|
|
@@ -542,13 +639,23 @@ func filterEvents(events []interface{}, since time.Time) []corev1.Event { | |
| } | ||
|
|
||
| func PrepareClient(stopCh <-chan struct{}) { | ||
| clientset, rootApiPath := createClientset() | ||
| clientset, rootApiPath, config := createClientset() | ||
| permissions := checkPermissions(clientset) | ||
| K8S = CreateClient(clientset, stopCh, rootApiPath, permissions) | ||
|
|
||
| var dynamicClient dynamic.Interface | ||
| if !extconfig.Config.DiscoveryDisabledArgoRollout { | ||
| var err error | ||
| dynamicClient, err = dynamic.NewForConfig(config) | ||
| if err != nil { | ||
| log.Fatal().Err(err).Msg("Failed to create dynamic client") | ||
| } | ||
| } | ||
|
|
||
| K8S = CreateClient(clientset, stopCh, rootApiPath, permissions, config, dynamicClient) | ||
| } | ||
|
|
||
| // CreateClient is visible for testing | ||
| func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootApiPath string, permissions *PermissionCheckResult) *Client { | ||
| func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootApiPath string, permissions *PermissionCheckResult, config *rest.Config, dynamicClient dynamic.Interface) *Client { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| client := &Client{ | ||
| Distribution: "kubernetes", | ||
| permissions: permissions, | ||
|
|
@@ -565,6 +672,7 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp | |
| } else { | ||
| factory = informers.NewSharedInformerFactory(clientset, informerResyncDuration) | ||
| } | ||
|
|
||
| client.resourceEventHandler = cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: func(obj interface{}) { | ||
| client.doNotify(obj) | ||
|
|
@@ -578,6 +686,30 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp | |
| } | ||
|
|
||
| var informerSyncList []cache.InformerSynced | ||
| var argoRolloutInformer informers.GenericInformer | ||
|
|
||
| // Initialize Argo Rollouts informer if enabled | ||
| if !extconfig.Config.DiscoveryDisabledArgoRollout { | ||
| argoRolloutGVR := schema.GroupVersionResource{ | ||
| Group: "argoproj.io", | ||
| Version: "v1alpha1", | ||
| Resource: "rollouts", | ||
| } | ||
| argoRolloutInformer = dynamicinformer.NewFilteredDynamicInformer( | ||
| dynamicClient, | ||
| argoRolloutGVR, | ||
| extconfig.Config.Namespace, | ||
| 0, | ||
| cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, | ||
| nil, | ||
| ) | ||
| client.argoRollout.informer = argoRolloutInformer.Informer() | ||
| client.argoRollout.lister = &rolloutLister{indexer: argoRolloutInformer.Informer().GetIndexer()} | ||
| informerSyncList = append(informerSyncList, client.argoRollout.informer.HasSynced) | ||
| if _, err := client.argoRollout.informer.AddEventHandler(client.resourceEventHandler); err != nil { | ||
| log.Fatal().Msg("failed to add argo rollout event handler") | ||
| } | ||
| } | ||
|
|
||
| daemonSets := factory.Apps().V1().DaemonSets() | ||
| client.daemonSet.informer = daemonSets.Informer() | ||
|
|
@@ -722,6 +854,9 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp | |
|
|
||
| defer runtime.HandleCrash() | ||
| go factory.Start(stopCh) | ||
| if argoRolloutInformer != nil { | ||
| go argoRolloutInformer.Informer().Run(stopCh) | ||
| } | ||
|
|
||
| log.Info().Msgf("Start Kubernetes cache sync.") | ||
| if !cache.WaitForCacheSync(stopCh, informerSyncList...) { | ||
|
|
@@ -972,7 +1107,7 @@ func isOpenShift(rootApiPath string) bool { | |
| return rootApiPath == "/oapi" || rootApiPath == "oapi" | ||
| } | ||
|
|
||
| func createClientset() (*kubernetes.Clientset, string) { | ||
| func createClientset() (*kubernetes.Clientset, string, *rest.Config) { | ||
| config, err := rest.InClusterConfig() | ||
| if err == nil { | ||
| log.Info().Msgf("Extension is running inside a cluster, config found") | ||
|
|
@@ -1007,7 +1142,7 @@ func createClientset() (*kubernetes.Clientset, string) { | |
|
|
||
| log.Info().Msgf("Cluster connected! Kubernetes Server Version %+v", info) | ||
|
|
||
| return clientset, config.APIPath | ||
| return clientset, config.APIPath, config | ||
| } | ||
|
|
||
| func IsExcludedFromDiscovery(objectMeta metav1.ObjectMeta) bool { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,9 +5,10 @@ import ( | |
|
|
||
| "github.com/rs/zerolog/log" | ||
| "github.com/steadybit/extension-kubernetes/v2/extconfig" | ||
| authorizationv1 "k8s.io/api/authorization/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/client-go/kubernetes" | ||
|
|
||
| authorizationv1 "k8s.io/api/authorization/v1" | ||
| ) | ||
|
|
||
| type PermissionCheckResult struct { | ||
|
|
@@ -62,6 +63,7 @@ var requiredPermissions = []requiredPermission{ | |
| {group: "", resource: "pods", subresource: "eviction", verbs: []string{"create"}, allowGracefulFailure: true}, | ||
| {group: "", resource: "nodes", verbs: []string{"patch"}, allowGracefulFailure: true}, | ||
| {group: "", resource: "pods", subresource: "exec", verbs: []string{"create"}, allowGracefulFailure: true}, | ||
| {group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true}, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In line 76 you are adding these permission conditionally. Should that be removed from this list? |
||
| {group: "networking.k8s.io", resource: "ingresses", verbs: []string{"get", "list", "watch", "update", "patch"}, allowGracefulFailure: true}, | ||
| {group: "networking.k8s.io", resource: "ingressclasses", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true}, | ||
| } | ||
|
|
@@ -71,6 +73,13 @@ func checkPermissions(client *kubernetes.Clientset) *PermissionCheckResult { | |
| reviews := client.AuthorizationV1().SelfSubjectAccessReviews() | ||
| errors := false | ||
|
|
||
| // Conditionally add Argo rollouts permissions | ||
| if !extconfig.Config.DiscoveryDisabledArgoRollout { | ||
| requiredPermissions = append(requiredPermissions, requiredPermission{ | ||
| group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true, | ||
| }) | ||
| } | ||
|
|
||
| for _, p := range requiredPermissions { | ||
| for _, verb := range p.verbs { | ||
| sar := authorizationv1.SelfSubjectAccessReview{ | ||
|
|
@@ -242,6 +251,14 @@ func (p *PermissionCheckResult) IsSetImageDeploymentPermitted() bool { | |
|
|
||
| func MockAllPermitted() *PermissionCheckResult { | ||
| result := make(map[string]PermissionCheckOutcome) | ||
|
|
||
| // Conditionally add Argo rollouts permissions | ||
| if !extconfig.Config.DiscoveryDisabledArgoRollout { | ||
| requiredPermissions = append(requiredPermissions, requiredPermission{ | ||
| group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true, | ||
| }) | ||
| } | ||
|
|
||
| for _, p := range requiredPermissions { | ||
| for _, verb := range p.verbs { | ||
| result[p.Key(verb)] = OK | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The env var is
STEADYBIT_EXTENSION_DISCOVERY_DISABLED_ARGO_ROLLOUTS(missingDin disableD)(Also the helm value - see my comment in the helm values.yaml)