Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Learn about the capabilities of this extension in our [Reliability Hub](https://
|------------------------------------------------------------------|---------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------|
| `STEADYBIT_EXTENSION_KUBERNETES_CLUSTER_NAME` | `kubernetes.clusterName` | The name of the kubernetes cluster | yes | |
| `STEADYBIT_EXTENSION_DISABLE_DISCOVERY_EXCLUDES` | `discovery.disableExcludes` | Ignore discovery excludes specified by `steadybit.com/discovery-disabled` | false | `false` |
| `STEADYBIT_EXTENSION_DISCOVERY_DISABLE_ARGO_ROLLOUTS` | `discovery.disableArgoRollouts` | Disable discovery of Argo rollouts | false | `true` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The env var is STEADYBIT_EXTENSION_DISCOVERY_DISABLED_ARGO_ROLLOUTS (missing D in disableD)

(Also the helm value - see my comment in the helm values.yaml)

| `STEADYBIT_EXTENSION_LABEL_FILTER` | | These labels will be ignored and not added to the discovered targets | false | `controller-revision-hash,pod-template-generation,pod-template-hash` |
| `STEADYBIT_EXTENSION_ACTIVE_ADVICE_LIST` | | List of active advice definitions, default is all (*). You can define a list of active adviceDefinitionId. See UI -> Settings -> Extension -> Advice -> Column: ID | false | `*` |
| `STEADYBIT_EXTENSION_ADVICE_SINGLE_REPLICA_MIN_REPLICAS` | | Minimal required replicas for the "Redundant Pod" advice | false | 2 |
Expand Down
11 changes: 10 additions & 1 deletion charts/steadybit-extension-kubernetes/templates/_permissions.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,14 @@ permissions for clusterrole or role
verbs:
- create
{{- end }}
{{- if not .Values.discovery.disableArgoRollouts }}
{{/* Required for Argo Discovery */}}
- apiGroups: ["argoproj.io"]
resources:
- rollouts
verbs:
- get
- list
- watch
{{- end }}
{{- end -}}

Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ spec:
{{- end }}
- name: STEADYBIT_EXTENSION_DISCOVERY_MAX_POD_COUNT
value: "{{ .Values.discovery.maxPodCount }}"
{{- if .Values.discovery.disableArgoRollouts }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test to charts/steadybit-extension-kubernetes/tests/deployment_test.yaml -> manifest should match snapshot with disabled discoveries

- name: STEADYBIT_EXTENSION_DISCOVERY_DISABLE_ARGO_ROLLOUTS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The env var is STEADYBIT_EXTENSION_DISCOVERY_DISABLED_ARGO_ROLLOUTS (missing D in disableD)

value: "true"
{{- end }}
- name: STEADYBIT_EXTENSION_DISCOVERY_REFRESH_THROTTLE
value: "{{ .Values.discovery.refreshThrottle }}"
{{- with .Values.extraEnvFrom }}
Expand Down
2 changes: 2 additions & 0 deletions charts/steadybit-extension-kubernetes/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ discovery:
disableAdvice: false
# discovery.maxPodCount -- Skip listing pods, containers and hosts for deployments, statefulsets, etc. if there are more then the given pods.
maxPodCount: 50
# discovery.disableArgoRollouts -- Disable discovery of Argo rollouts (requires argoproj.io rollout CRDs)
disableArgoRollouts: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 195ff has already some values to disable discoveries

# discovery.refreshThrottle -- Number of seconds between successive refreshes of the target data.
refreshThrottle: 20
attributes:
Expand Down
165 changes: 150 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (
"errors"
"flag"
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/google/uuid"

"github.com/rs/zerolog/log"
"github.com/steadybit/extension-kubernetes/v2/extconfig"
"golang.org/x/exp/slices"
Expand All @@ -19,8 +26,12 @@ import (
networkingv1 "k8s.io/api/networking/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
networkingv1client "k8s.io/client-go/kubernetes/typed/networking/v1"
Expand All @@ -33,11 +44,6 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/homedir"
"path/filepath"
"sort"
"strings"
"sync"
"time"
)

var K8S *Client
Expand All @@ -46,6 +52,11 @@ type Client struct {
Distribution string
permissions *PermissionCheckResult

argoRollout struct {
lister RolloutLister
informer cache.SharedIndexInformer
}

daemonSet struct {
lister listerAppsv1.DaemonSetLister
informer cache.SharedIndexInformer
Expand Down Expand Up @@ -114,6 +125,73 @@ type Client struct {
clientset kubernetes.Interface
}

type RolloutLister interface {
List(selector labels.Selector) ([]*unstructured.Unstructured, error)
Rollouts(namespace string) RolloutNamespaceLister
}

type RolloutNamespaceLister interface {
List(selector labels.Selector) ([]*unstructured.Unstructured, error)
Get(name string) (*unstructured.Unstructured, error)
}

type rolloutLister struct {
indexer cache.Indexer
}

func (l *rolloutLister) List(selector labels.Selector) ([]*unstructured.Unstructured, error) {
items := l.indexer.List()
result := make([]*unstructured.Unstructured, 0, len(items))
for _, item := range items {
if obj, ok := item.(*unstructured.Unstructured); ok {
result = append(result, obj)
}
}
return result, nil
}

func (l *rolloutLister) Rollouts(namespace string) RolloutNamespaceLister {
return &rolloutNamespaceLister{
indexer: l.indexer,
namespace: namespace,
}
}

type rolloutNamespaceLister struct {
indexer cache.Indexer
namespace string
}

func (l *rolloutNamespaceLister) List(selector labels.Selector) ([]*unstructured.Unstructured, error) {
items, err := l.indexer.ByIndex(cache.NamespaceIndex, l.namespace)
if err != nil {
return nil, err
}
result := make([]*unstructured.Unstructured, 0, len(items))
for _, item := range items {
if obj, ok := item.(*unstructured.Unstructured); ok {
result = append(result, obj)
}
}
return result, nil
}

func (l *rolloutNamespaceLister) Get(name string) (*unstructured.Unstructured, error) {
obj, exists, err := l.indexer.GetByKey(l.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, k8sErrors.NewNotFound(schema.GroupResource{Group: "argoproj.io", Resource: "rollouts"}, name)
}

if rolloutObj, ok := obj.(*unstructured.Unstructured); ok {
return rolloutObj, nil
} else {
return nil, fmt.Errorf("expected *unstructured.Unstructured, got %T", obj)
}
}

func (c *Client) Permissions() *PermissionCheckResult {
return c.permissions
}
Expand Down Expand Up @@ -175,12 +253,12 @@ func (c *Client) PodsByLabelSelector(labelSelector *metav1.LabelSelector, namesp
// ExecInPod executes a command in a pod container and returns the output
func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerName string, command []string) (string, error) {
config := c.GetConfig()

// Check if we have a valid REST config
if config.Host == "" {
return "", fmt.Errorf("no valid Kubernetes REST config available - cannot execute pod commands")
}

req := c.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Expand All @@ -190,7 +268,7 @@ func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerNam
Param("command", command[0]).
Param("stdout", "true").
Param("stderr", "true")

// Add additional command arguments
for _, arg := range command[1:] {
req.Param("command", arg)
Expand All @@ -206,11 +284,11 @@ func (c *Client) ExecInPod(ctx context.Context, namespace, podName, containerNam
Stdout: &stdout,
Stderr: &stderr,
})

if err != nil {
return "", fmt.Errorf("error executing command: %w, stderr: %s", err, stderr.String())
}

return stdout.String(), nil
}

Expand Down Expand Up @@ -259,6 +337,25 @@ func (c *Client) DeploymentByNamespaceAndName(namespace string, name string) *ap
return item
}

func (c *Client) ArgoRollouts() []*unstructured.Unstructured {
if extconfig.HasNamespaceFilter() {
log.Info().Msgf("Fetching Argo Rollouts for namespace %s", extconfig.Config.Namespace)
rollouts, err := c.argoRollout.lister.Rollouts(extconfig.Config.Namespace).List(labels.Everything())
if err != nil {
log.Error().Err(err).Msgf("Error while fetching Argo Rollouts")
return []*unstructured.Unstructured{}
}
return rollouts
} else {
rollouts, err := c.argoRollout.lister.List(labels.Everything())
if err != nil {
log.Error().Err(err).Msgf("Error while fetching Argo Rollouts")
return []*unstructured.Unstructured{}
}
return rollouts
}
}

func (c *Client) ServicesByPod(pod *corev1.Pod) []*corev1.Service {
services, err := c.service.lister.Services(pod.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -542,13 +639,23 @@ func filterEvents(events []interface{}, since time.Time) []corev1.Event {
}

func PrepareClient(stopCh <-chan struct{}) {
clientset, rootApiPath := createClientset()
clientset, rootApiPath, config := createClientset()
permissions := checkPermissions(clientset)
K8S = CreateClient(clientset, stopCh, rootApiPath, permissions)

var dynamicClient dynamic.Interface
if !extconfig.Config.DiscoveryDisabledArgoRollout {
var err error
dynamicClient, err = dynamic.NewForConfig(config)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create dynamic client")
}
}

K8S = CreateClient(clientset, stopCh, rootApiPath, permissions, config, dynamicClient)
}

// CreateClient is visible for testing
func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootApiPath string, permissions *PermissionCheckResult) *Client {
func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootApiPath string, permissions *PermissionCheckResult, config *rest.Config, dynamicClient dynamic.Interface) *Client {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config *rest.Config seems to be unused

client := &Client{
Distribution: "kubernetes",
permissions: permissions,
Expand All @@ -565,6 +672,7 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp
} else {
factory = informers.NewSharedInformerFactory(clientset, informerResyncDuration)
}

client.resourceEventHandler = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
client.doNotify(obj)
Expand All @@ -578,6 +686,30 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp
}

var informerSyncList []cache.InformerSynced
var argoRolloutInformer informers.GenericInformer

// Initialize Argo Rollouts informer if enabled
if !extconfig.Config.DiscoveryDisabledArgoRollout {
argoRolloutGVR := schema.GroupVersionResource{
Group: "argoproj.io",
Version: "v1alpha1",
Resource: "rollouts",
}
argoRolloutInformer = dynamicinformer.NewFilteredDynamicInformer(
dynamicClient,
argoRolloutGVR,
extconfig.Config.Namespace,
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
)
client.argoRollout.informer = argoRolloutInformer.Informer()
client.argoRollout.lister = &rolloutLister{indexer: argoRolloutInformer.Informer().GetIndexer()}
informerSyncList = append(informerSyncList, client.argoRollout.informer.HasSynced)
if _, err := client.argoRollout.informer.AddEventHandler(client.resourceEventHandler); err != nil {
log.Fatal().Msg("failed to add argo rollout event handler")
}
}

daemonSets := factory.Apps().V1().DaemonSets()
client.daemonSet.informer = daemonSets.Informer()
Expand Down Expand Up @@ -722,6 +854,9 @@ func CreateClient(clientset kubernetes.Interface, stopCh <-chan struct{}, rootAp

defer runtime.HandleCrash()
go factory.Start(stopCh)
if argoRolloutInformer != nil {
go argoRolloutInformer.Informer().Run(stopCh)
}

log.Info().Msgf("Start Kubernetes cache sync.")
if !cache.WaitForCacheSync(stopCh, informerSyncList...) {
Expand Down Expand Up @@ -972,7 +1107,7 @@ func isOpenShift(rootApiPath string) bool {
return rootApiPath == "/oapi" || rootApiPath == "oapi"
}

func createClientset() (*kubernetes.Clientset, string) {
func createClientset() (*kubernetes.Clientset, string, *rest.Config) {
config, err := rest.InClusterConfig()
if err == nil {
log.Info().Msgf("Extension is running inside a cluster, config found")
Expand Down Expand Up @@ -1007,7 +1142,7 @@ func createClientset() (*kubernetes.Clientset, string) {

log.Info().Msgf("Cluster connected! Kubernetes Server Version %+v", info)

return clientset, config.APIPath
return clientset, config.APIPath, config
}

func IsExcludedFromDiscovery(objectMeta metav1.ObjectMeta) bool {
Expand Down
19 changes: 18 additions & 1 deletion client/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (

"github.com/rs/zerolog/log"
"github.com/steadybit/extension-kubernetes/v2/extconfig"
authorizationv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

authorizationv1 "k8s.io/api/authorization/v1"
)

type PermissionCheckResult struct {
Expand Down Expand Up @@ -62,6 +63,7 @@ var requiredPermissions = []requiredPermission{
{group: "", resource: "pods", subresource: "eviction", verbs: []string{"create"}, allowGracefulFailure: true},
{group: "", resource: "nodes", verbs: []string{"patch"}, allowGracefulFailure: true},
{group: "", resource: "pods", subresource: "exec", verbs: []string{"create"}, allowGracefulFailure: true},
{group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 76 you are adding these permission conditionally. Should that be removed from this list?

{group: "networking.k8s.io", resource: "ingresses", verbs: []string{"get", "list", "watch", "update", "patch"}, allowGracefulFailure: true},
{group: "networking.k8s.io", resource: "ingressclasses", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true},
}
Expand All @@ -71,6 +73,13 @@ func checkPermissions(client *kubernetes.Clientset) *PermissionCheckResult {
reviews := client.AuthorizationV1().SelfSubjectAccessReviews()
errors := false

// Conditionally add Argo rollouts permissions
if !extconfig.Config.DiscoveryDisabledArgoRollout {
requiredPermissions = append(requiredPermissions, requiredPermission{
group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true,
})
}

for _, p := range requiredPermissions {
for _, verb := range p.verbs {
sar := authorizationv1.SelfSubjectAccessReview{
Expand Down Expand Up @@ -242,6 +251,14 @@ func (p *PermissionCheckResult) IsSetImageDeploymentPermitted() bool {

func MockAllPermitted() *PermissionCheckResult {
result := make(map[string]PermissionCheckOutcome)

// Conditionally add Argo rollouts permissions
if !extconfig.Config.DiscoveryDisabledArgoRollout {
requiredPermissions = append(requiredPermissions, requiredPermission{
group: "argoproj.io", resource: "rollouts", verbs: []string{"get", "list", "watch"}, allowGracefulFailure: true,
})
}

for _, p := range requiredPermissions {
for _, verb := range p.verbs {
result[p.Key(verb)] = OK
Expand Down
Loading
Loading