diff --git a/internal/kubernetes/kubernetes.go b/internal/kubernetes/kubernetes.go index 0aa8179..48f44a6 100644 --- a/internal/kubernetes/kubernetes.go +++ b/internal/kubernetes/kubernetes.go @@ -18,20 +18,163 @@ package kubernetes import ( "context" "fmt" + "sync" + "time" "github.com/eiffel-community/etos-api/internal/config" "github.com/sirupsen/logrus" v1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) +// Cache entry with TTL +type cacheEntry struct { + data interface{} + timestamp time.Time +} + +// Cache for Kubernetes API responses with TTL +type kubernetesCache struct { + jobs sync.Map // map[string]*cacheEntry for job lists + pods sync.Map // map[string]*cacheEntry for pod lists + cacheTTL time.Duration // Cache validity duration + // Mutexes to prevent concurrent API calls for the same resource + jobsMutex sync.Mutex + podsMutex sync.Mutex +} + +// newKubernetesCache creates a new cache with configured cache validity +func newKubernetesCache() *kubernetesCache { + return &kubernetesCache{ + cacheTTL: 5 * time.Second, + } +} + +// getAllJobs retrieves all jobs from cache or API, making API calls if cached data is stale +func (c *kubernetesCache) getAllJobs(ctx context.Context, client *kubernetes.Clientset, namespace string, logger *logrus.Entry) (*v1.JobList, error) { + // Use namespace as cache key since we're caching all jobs in the namespace + key := fmt.Sprintf("all_jobs_%s", namespace) + + // Nested function to check cache and return data if fresh + checkCache := func() (*v1.JobList, bool) { + if cached, ok := c.jobs.Load(key); ok { + if entry, ok := cached.(*cacheEntry); ok { + if time.Since(entry.timestamp) < c.cacheTTL { + if jobs, ok := entry.data.(*v1.JobList); ok { + return jobs, true + } + } + } + } + return nil, false + } + + // Check cache first (fast path - no locking) + if jobs, found := checkCache(); found { + logger.Debugf("Returning cached jobs for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items)) + return jobs, nil + } + + // Use mutex to prevent concurrent API calls + c.jobsMutex.Lock() + defer c.jobsMutex.Unlock() + + // Double-check cache after acquiring mutex (another goroutine might have updated it) + if jobs, found := checkCache(); found { + logger.Debugf("Returning cached jobs for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items)) + return jobs, nil + } + + // Fetch from API if no cache entry exists or cached data is stale + logger.Debugf("Making Kubernetes API call to fetch all jobs for namespace: %s", namespace) + jobs, err := client.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Errorf("Failed to fetch jobs from Kubernetes API for namespace %s: %v", namespace, err) + return nil, err + } + + // Store in cache + c.jobs.Store(key, &cacheEntry{ + data: jobs, + timestamp: time.Now(), + }) + + logger.Debugf("Successfully fetched and cached %d jobs for namespace: %s", len(jobs.Items), namespace) + return jobs, nil +} + +// getAllPods retrieves all pods from cache or API, making API calls if cached data is stale +func (c *kubernetesCache) getAllPods(ctx context.Context, client *kubernetes.Clientset, namespace string, logger *logrus.Entry) (*corev1.PodList, error) { + // Use namespace as cache key since we're caching all pods in the namespace + key := fmt.Sprintf("all_pods_%s", namespace) + + // Nested function to check cache and return data if fresh + checkCache := func() (*corev1.PodList, bool) { + if cached, ok := c.pods.Load(key); ok { + if entry, ok := cached.(*cacheEntry); ok { + if time.Since(entry.timestamp) < c.cacheTTL { + if pods, ok := entry.data.(*corev1.PodList); ok { + return pods, true + } + } + } + } + return nil, false + } + + // Check cache first (fast path - no locking) + if pods, found := checkCache(); found { + logger.Debugf("Returning cached pods for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items)) + return pods, nil + } + + // Use mutex to prevent concurrent API calls + c.podsMutex.Lock() + defer c.podsMutex.Unlock() + + // Double-check cache after acquiring mutex (another goroutine might have updated it) + if pods, found := checkCache(); found { + logger.Debugf("Returning cached pods for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items)) + return pods, nil + } + + // Fetch from API if no cache entry exists or cached data is stale + logger.Debugf("Making Kubernetes API call to fetch all pods for namespace: %s", namespace) + pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Errorf("Failed to fetch pods from Kubernetes API for namespace %s: %v", namespace, err) + return nil, err + } + + // Store in cache + c.pods.Store(key, &cacheEntry{ + data: pods, + timestamp: time.Now(), + }) + + logger.Debugf("Successfully fetched and cached %d pods for namespace: %s", len(pods.Items), namespace) + return pods, nil +} + +// getTimestamp is a helper function to get the timestamp of a cache entry +func getTimestamp(cache *sync.Map, key string) time.Time { + if cached, ok := cache.Load(key); ok { + if entry, ok := cached.(*cacheEntry); ok { + return entry.timestamp + } + } + return time.Time{} +} + type Kubernetes struct { logger *logrus.Entry config *rest.Config client *kubernetes.Clientset namespace string + cache *kubernetesCache } // New creates a new Kubernetes struct. @@ -39,6 +182,7 @@ func New(cfg config.Config, log *logrus.Entry) *Kubernetes { return &Kubernetes{ logger: log, namespace: cfg.ETOSNamespace(), + cache: newKubernetesCache(), } } @@ -59,8 +203,23 @@ func (k *Kubernetes) clientset() (*kubernetes.Clientset, error) { } k.config = cfg } + + // Log rate limiter settings before creating client + if k.config.RateLimiter != nil { + k.logger.Debug("Kubernetes client has custom rate limiter configured") + } + + // Log QPS and Burst settings + if k.config.QPS > 0 || k.config.Burst > 0 { + k.logger.Debugf("Kubernetes client rate limiter settings - QPS: %.2f, Burst: %d", + k.config.QPS, k.config.Burst) + } else { + k.logger.Debug("Kubernetes client using default rate limiter settings") + } + cli, err := kubernetes.NewForConfig(k.config) if err != nil { + k.logger.Errorf("Failed to create Kubernetes client: %v", err) return nil, err } k.client = cli @@ -69,25 +228,37 @@ func (k *Kubernetes) clientset() (*kubernetes.Clientset, error) { // getJobsByIdentifier returns a list of jobs bound to the given testrun identifier. func (k *Kubernetes) getJobsByIdentifier(ctx context.Context, client *kubernetes.Clientset, identifier string) (*v1.JobList, error) { + // Get all jobs from cache or API + allJobs, err := k.cache.getAllJobs(ctx, client, k.namespace, k.logger) + if err != nil { + return nil, err + } + + // Filter jobs by identifier in-memory + filteredJobs := &v1.JobList{ + TypeMeta: allJobs.TypeMeta, + ListMeta: allJobs.ListMeta, + Items: []v1.Job{}, + } + // Try different labels for backward compatibility: // - etos.eiffel-community.github.io/id is v1alpha+ // - id is v0 legacy - for _, label := range []string{"etos.eiffel-community.github.io/id", "id"} { - jobs, err := client.BatchV1().Jobs(k.namespace).List( - ctx, - metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", label, identifier), - }, - ) - if err != nil { - k.logger.Error(err) - return nil, err - } - if len(jobs.Items) > 0 { - return jobs, nil + labelKeys := []string{"etos.eiffel-community.github.io/id", "id"} + + for _, job := range allJobs.Items { + for _, labelKey := range labelKeys { + if labelValue, exists := job.Labels[labelKey]; exists && labelValue == identifier { + filteredJobs.Items = append(filteredJobs.Items, job) + break // Found match, no need to check other labels for this job + } } } - return &v1.JobList{}, nil + + k.logger.Debugf("Filtered %d jobs with identifier '%s' from %d total jobs", + len(filteredJobs.Items), identifier, len(allJobs.Items)) + + return filteredJobs, nil } // IsFinished checks if an ESR job is finished. @@ -130,18 +301,27 @@ func (k *Kubernetes) LogListenerIP(ctx context.Context, identifier string) (stri } job := jobs.Items[0] - pods, err := client.CoreV1().Pods(k.namespace).List( - ctx, - metav1.ListOptions{ - LabelSelector: fmt.Sprintf("job-name=%s", job.Name), - }, - ) + // Get all pods from cache or API + allPods, err := k.cache.getAllPods(ctx, client, k.namespace, k.logger) if err != nil { return "", err } - if len(pods.Items) == 0 { + + // Filter pods by job name in-memory + var matchingPods []corev1.Pod + for _, pod := range allPods.Items { + if jobName, exists := pod.Labels["job-name"]; exists && jobName == job.Name { + matchingPods = append(matchingPods, pod) + } + } + + if len(matchingPods) == 0 { return "", fmt.Errorf("could not find pod for job with id %s", identifier) } - pod := pods.Items[0] + + k.logger.Debugf("Found %d pods for job '%s' with identifier '%s'", + len(matchingPods), job.Name, identifier) + + pod := matchingPods[0] return pod.Status.PodIP, nil }