Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
connMap: map[string]connectionEntry{},
}

clusterCache, err := cluster.NewClusterCacheInstance(ctx, client.Clientset,
a.namespace, a.redisProxyMsgHandler.redisAddress, cacheutil.RedisCompressionGZip)
clusterCache, err := cluster.NewClusterCacheInstance(a.redisProxyMsgHandler.redisAddress, a.redisProxyMsgHandler.redisPassword, cacheutil.RedisCompressionGZip)
if err != nil {
return nil, fmt.Errorf("failed to create cluster cache instance: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/outbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func Test_addClusterCacheInfoUpdateToQueue(t *testing.T) {
a.emitter = event.NewEventSource("principal")

// First populate the cache with dummy data
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), "", cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
require.NoError(t, err)
err = clusterMgr.MapCluster("test-agent", &v1alpha1.Cluster{
Name: "test-cluster",
Expand Down
6 changes: 5 additions & 1 deletion cmd/argocd-agent/principal.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func NewPrincipalRunCommand() *cobra.Command {
keepAliveMinimumInterval time.Duration

redisAddress string
redisPassword string
redisCompressionType string
healthzPort int
)
Expand Down Expand Up @@ -242,7 +243,7 @@ func NewPrincipalRunCommand() *cobra.Command {

opts = append(opts, principal.WithWebSocket(enableWebSocket))
opts = append(opts, principal.WithKeepAliveMinimumInterval(keepAliveMinimumInterval))
opts = append(opts, principal.WithRedis(redisAddress, redisCompressionType))
opts = append(opts, principal.WithRedis(redisAddress, redisPassword, redisCompressionType))
opts = append(opts, principal.WithHealthzPort(healthzPort))

s, err := principal.NewServer(ctx, kubeConfig, namespace, opts...)
Expand Down Expand Up @@ -360,6 +361,9 @@ func NewPrincipalRunCommand() *cobra.Command {
command.Flags().StringVar(&redisAddress, "redis-server-address",
env.StringWithDefault("ARGOCD_PRINCIPAL_REDIS_SERVER_ADDRESS", nil, "argocd-redis:6379"),
"Redis server hostname and port (e.g. argocd-redis:6379).")
command.Flags().StringVar(&redisPassword, "redis-password",
env.StringWithDefault("REDIS_PASSWORD", nil, ""),
"The password to connect to redis with")

command.Flags().StringVar(&redisCompressionType, "redis-compression-type",
env.StringWithDefault("ARGOCD_PRINCIPAL_REDIS_COMPRESSION_TYPE", nil, string(cacheutil.RedisCompressionGZip)),
Expand Down
4 changes: 4 additions & 0 deletions hack/dev-env/start-agent-autonomous.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
echo $ARGOCD_AGENT_REMOTE_PORT
export ARGOCD_AGENT_REMOTE_PORT=${ARGOCD_AGENT_REMOTE_PORT:-8443}

if test "${REDIS_PASSWORD}" = ""; then
export REDIS_PASSWORD=$(kubectl get secret argocd-redis --context=vcluster-agent-autonomous -n argocd -o jsonpath='{.data.auth}' | base64 --decode)
fi

# Point the agent to the toxiproxy server if it is configured from the e2e tests
E2E_ENV_FILE="/tmp/argocd-agent-e2e"
if [ -f "$E2E_ENV_FILE" ]; then
Expand Down
4 changes: 4 additions & 0 deletions hack/dev-env/start-agent-managed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"

export ARGOCD_AGENT_REMOTE_PORT=${ARGOCD_AGENT_REMOTE_PORT:-8443}

if test "${REDIS_PASSWORD}" = ""; then
export REDIS_PASSWORD=$(kubectl get secret argocd-redis --context=vcluster-agent-managed -n argocd -o jsonpath='{.data.auth}' | base64 --decode)
fi

# Point the agent to the toxiproxy server if it is configured from the e2e tests
E2E_ENV_FILE="/tmp/argocd-agent-e2e"
if [ -f "$E2E_ENV_FILE" ]; then
Expand Down
4 changes: 4 additions & 0 deletions hack/dev-env/start-principal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ if test "${ARGOCD_PRINCIPAL_REDIS_SERVER_ADDRESS}" = ""; then
export ARGOCD_PRINCIPAL_REDIS_SERVER_ADDRESS
fi

if test "${REDIS_PASSWORD}" = ""; then
export REDIS_PASSWORD=$(kubectl get secret argocd-redis --context=vcluster-control-plane -n argocd -o jsonpath='{.data.auth}' | base64 --decode)
fi

# Point the principal to the e2e test configuration if it exists
E2E_ENV_FILE="/tmp/argocd-agent-e2e"
if [ -f "$E2E_ENV_FILE" ]; then
Expand Down
12 changes: 2 additions & 10 deletions internal/argocd/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package cluster

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -24,12 +23,10 @@ import (
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"

"github.com/argoproj/argo-cd/v3/common"
appv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// SetAgentConnectionStatus updates cluster info with connection state and time in mapped cluster at principal.
Expand Down Expand Up @@ -167,14 +164,9 @@ func (m *Manager) setClusterInfo(clusterServer, agentName, clusterName string, c
}

// NewClusterCacheInstance creates a new cache instance with Redis connection
func NewClusterCacheInstance(ctx context.Context, kubeclient kubernetes.Interface,
namespace, redisAddress string, redisCompressionType cacheutil.RedisCompressionType) (*appstatecache.Cache, error) {
redisOptions := &redis.Options{Addr: redisAddress}

if err := common.SetOptionalRedisPasswordFromKubeConfig(ctx, kubeclient, namespace, redisOptions); err != nil {
return nil, fmt.Errorf("failed to set redis password for namespace %s: %v", namespace, err)
}
func NewClusterCacheInstance(redisAddress, redisPassword string, redisCompressionType cacheutil.RedisCompressionType) (*appstatecache.Cache, error) {

redisOptions := &redis.Options{Addr: redisAddress, Password: redisPassword}
redisClient := redis.NewClient(redisOptions)

clusterCache := appstatecache.NewCache(cacheutil.NewCache(
Expand Down
6 changes: 3 additions & 3 deletions internal/argocd/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func setup(t *testing.T, redisAddress string) (string, *Manager) {
t.Helper()
agentName, clusterName := "agent-test", "cluster"

m, err := NewManager(context.Background(), "default", redisAddress, cacheutil.RedisCompressionNone,
m, err := NewManager(context.Background(), "default", redisAddress, "", cacheutil.RedisCompressionNone,
kube.NewFakeKubeClient("default"))
require.NoError(t, err)

Expand Down Expand Up @@ -221,7 +221,7 @@ func Test_SetAgentConnectionStatus(t *testing.T) {

t.Run("SetAgentConnectionStatus with invalid redis address", func(t *testing.T) {
// Create a manager with invalid redis address
invalidM, err := NewManager(context.Background(), "default", "invalid:redis:address",
invalidM, err := NewManager(context.Background(), "default", "invalid:redis:address", "",
cacheutil.RedisCompressionNone, kube.NewFakeKubeClient("default"))
require.NoError(t, err)

Expand Down Expand Up @@ -300,7 +300,7 @@ func Test_RefreshClusterInfo(t *testing.T) {

t.Run("RefreshClusterInfo with invalid redis", func(t *testing.T) {
// Create manager with invalid redis
invalidM, err := NewManager(context.Background(), "default", "invalid:redis",
invalidM, err := NewManager(context.Background(), "default", "invalid:redis", "",
cacheutil.RedisCompressionNone, kube.NewFakeKubeClient("default"))
require.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions internal/argocd/cluster/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func Test_onClusterAdded(t *testing.T) {
t.Run("Successfully add a cluster", func(t *testing.T) {
m, err := NewManager(context.TODO(), "argocd", "", "", kube.NewFakeKubeClient("argocd"))
m, err := NewManager(context.TODO(), "argocd", "", "", "", kube.NewFakeKubeClient("argocd"))
require.NoError(t, err)
s := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -29,7 +29,7 @@ func Test_onClusterAdded(t *testing.T) {
assert.Len(t, m.clusters, 1)
})
t.Run("Secret is missing one or more labels", func(t *testing.T) {
m, err := NewManager(context.TODO(), "argocd", "", "", kube.NewFakeKubeClient("argocd"))
m, err := NewManager(context.TODO(), "argocd", "", "", "", kube.NewFakeKubeClient("argocd"))
require.NoError(t, err)
s := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -46,7 +46,7 @@ func Test_onClusterAdded(t *testing.T) {
assert.Len(t, m.clusters, 0)
})
t.Run("Target agent already has a mapping", func(t *testing.T) {
m, err := NewManager(context.TODO(), "argocd", "", "", kube.NewFakeKubeClient("argocd"))
m, err := NewManager(context.TODO(), "argocd", "", "", "", kube.NewFakeKubeClient("argocd"))
require.NoError(t, err)
s := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -83,7 +83,7 @@ func Test_onClusterUpdated(t *testing.T) {
Name: "cluster",
},
}
m, err := NewManager(context.TODO(), "argocd", "", "", kube.NewFakeKubeClient("argocd"))
m, err := NewManager(context.TODO(), "argocd", "", "", "", kube.NewFakeKubeClient("argocd"))
require.NoError(t, err)
m.mapCluster("agent1", &v1alpha1.Cluster{})
assert.NotNil(t, m.mapping("agent1"))
Expand Down Expand Up @@ -111,7 +111,7 @@ func Test_onClusterUpdated(t *testing.T) {
Name: "cluster2",
},
}
m, err := NewManager(context.TODO(), "argocd", "", "", kube.NewFakeKubeClient("argocd"))
m, err := NewManager(context.TODO(), "argocd", "", "", "", kube.NewFakeKubeClient("argocd"))
require.NoError(t, err)
m.mapCluster("agent1", &v1alpha1.Cluster{Name: "cluster1"})
assert.NotNil(t, m.mapping("agent1"))
Expand Down
20 changes: 8 additions & 12 deletions internal/argocd/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,21 @@ type Manager struct {
// manager
filters *filter.Chain[*v1.Secret]

redisAddress string
redisCompressionType cacheutil.RedisCompressionType
clusterCache *appstatecache.Cache
clusterCache *appstatecache.Cache
}

// NewManager instantiates and initializes a new Manager.
func NewManager(ctx context.Context, namespace, redisAddress string, redisCompressionType cacheutil.RedisCompressionType, kubeclient kubernetes.Interface) (*Manager, error) {
func NewManager(ctx context.Context, namespace, redisAddress, redisPassword string, redisCompressionType cacheutil.RedisCompressionType, kubeclient kubernetes.Interface) (*Manager, error) {
var err error
m := &Manager{
clusters: make(map[string]*v1alpha1.Cluster),
namespace: namespace,
kubeclient: kubeclient,
ctx: ctx,
filters: filter.NewFilterChain[*v1.Secret](),
redisAddress: redisAddress,
redisCompressionType: redisCompressionType,
clusters: make(map[string]*v1alpha1.Cluster),
namespace: namespace,
kubeclient: kubeclient,
ctx: ctx,
filters: filter.NewFilterChain[*v1.Secret](),
}

m.clusterCache, err = NewClusterCacheInstance(ctx, kubeclient, namespace, redisAddress, redisCompressionType)
m.clusterCache, err = NewClusterCacheInstance(redisAddress, redisPassword, redisCompressionType)
if err != nil {
return nil, fmt.Errorf("failed to create cluster cache instance: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/argocd/cluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Test_StartStop(t *testing.T) {
},
}
clt := kube.NewFakeClientsetWithResources(redisSecret)
m, err := NewManager(context.TODO(), "argocd", "", "", clt)
m, err := NewManager(context.TODO(), "argocd", "", "", "", clt)
require.NoError(t, err)
require.NotNil(t, m)
err = m.Start()
Expand All @@ -74,7 +74,7 @@ func Test_onClusterAdd(t *testing.T) {
},
}
clt := kube.NewFakeClientsetWithResources(redisSecret)
m, err := NewManager(context.TODO(), "argocd", "", "", clt)
m, err := NewManager(context.TODO(), "argocd", "", "", "", clt)
require.NoError(t, err)
require.NotNil(t, m)
err = m.Start()
Expand Down
4 changes: 3 additions & 1 deletion principal/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type ServerOptions struct {
rootCa *x509.CertPool
clientCertSubjectMatch bool
redisAddress string
redisPassword string
redisCompressionType cacheutil.RedisCompressionType
healthzPort int
}
Expand Down Expand Up @@ -435,14 +436,15 @@ func WithKeepAliveMinimumInterval(interval time.Duration) ServerOption {
}
}

func WithRedis(redisAddress, redisCompressionTypeStr string) ServerOption {
func WithRedis(redisAddress, redisPassword, redisCompressionTypeStr string) ServerOption {
return func(o *Server) error {
redisCompressionType, err := cacheutil.CompressionTypeFromString(redisCompressionTypeStr)
if err != nil {
return err
}
o.options.redisCompressionType = redisCompressionType
o.options.redisAddress = redisAddress
o.options.redisPassword = redisPassword

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion principal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace

// Instantiate the cluster manager to handle Argo CD cluster secrets for
// agents.
s.clusterMgr, err = cluster.NewManager(s.ctx, s.namespace, s.options.redisAddress, s.options.redisCompressionType, s.kubeClient.Clientset)
s.clusterMgr, err = cluster.NewManager(s.ctx, s.namespace, s.options.redisAddress, s.options.redisPassword, s.options.redisCompressionType, s.kubeClient.Clientset)
if err != nil {
return nil, err
}
Expand Down
Loading