Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 258 additions & 23 deletions cli/cmd/capture/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package capture

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
Expand All @@ -51,6 +54,7 @@ var (

const (
DefaultOutputPath = "./"
TimestampFormat = "20060102150405"
)

var (
Expand All @@ -69,6 +73,8 @@ var (
ErrNoBlobsFound = errors.New("no blobs found with prefix")
captureName string
outputPath string
downloadAll bool
downloadAllNamespaces bool
)

var (
Expand All @@ -79,6 +85,8 @@ var (
ErrEmptyDownloadOutput = errors.New("download command produced no output")
ErrFailedToCreateDownloadPod = errors.New("failed to create download pod")
ErrUnsupportedNodeOS = errors.New("unsupported node operating system")
ErrMissingRequiredFlags = errors.New("either --name, --blob-url, or --all must be specified")
ErrAllNamespacesRequiresAll = errors.New("--all-namespaces flag can only be used with --all flag")
)

// DownloadCmd holds all OS-specific commands and configurations
Expand All @@ -98,6 +106,12 @@ type DownloadService struct {
namespace string
}

// Key represents a unique capture identifier
type Key struct {
Name string
Namespace string
}

// NewDownloadService creates a new download service with shared dependencies
func NewDownloadService(kubeClient kubernetes.Interface, config *rest.Config, namespace string) *DownloadService {
return &DownloadService{
Expand Down Expand Up @@ -198,6 +212,12 @@ var downloadExample = templates.Examples(i18n.T(`
# Download the capture file(s) created using the capture name and define output location
kubectl retina capture download --name <capture-name> -o <output-location>

# Download all available captures
kubectl retina capture download --all

# Download all available captures from all namespaces
kubectl retina capture download --all --all-namespaces

# Download capture file(s) from Blob Storage via Blob URL (Blob URL requires Read/List permissions)
kubectl retina capture download --blob-url "<blob-url>"
`))
Expand Down Expand Up @@ -248,49 +268,61 @@ func downloadFromCluster(ctx context.Context, config *rest.Config, namespace str

// DownloadFile downloads a capture file from a specific node
func (ds *DownloadService) DownloadFile(ctx context.Context, nodeName, hostPath, fileName, captureName string) error {
content, err := ds.DownloadFileContent(ctx, nodeName, hostPath, fileName, captureName)
if err != nil {
return err
}

outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
fmt.Printf("Bytes retrieved: %d\n", len(content))

err = os.WriteFile(outputFile, content, 0o600)
if err != nil {
return errors.Join(ErrWriteFileToHost, err)
}

fmt.Printf("File written to: %s\n", outputFile)
return nil
}

// DownloadFileContent downloads a capture file from a specific node and returns the content
func (ds *DownloadService) DownloadFileContent(ctx context.Context, nodeName, hostPath, fileName, captureName string) ([]byte, error) {
node, err := ds.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return errors.Join(ErrGetNodeInfo, err)
return nil, errors.Join(ErrGetNodeInfo, err)
}

downloadCmd, err := getDownloadCmd(node, hostPath, fileName)
if err != nil {
return err
return nil, err
}

fmt.Println("File to be downloaded: ", downloadCmd.SrcFilePath)
downloadPod, err := ds.createDownloadPod(ctx, nodeName, hostPath, captureName, downloadCmd)
if err != nil {
return err
return nil, err
}

// Ensure cleanup
defer func() {
cleanupErr := ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
if cleanupErr != nil {
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(cleanupErr))
}
}()

fileExists, err := ds.verifyFileExists(ctx, downloadPod, downloadCmd)
if err != nil || !fileExists {
return err
return nil, err
}

fmt.Println("Obtaining file...")
fileContent, err := ds.executeFileDownload(ctx, downloadPod, downloadCmd)
if err != nil {
return err
}

outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
fmt.Printf("Bytes retrieved: %d\n", len(fileContent))

err = os.WriteFile(outputFile, fileContent, 0o600)
if err != nil {
return errors.Join(ErrWriteFileToHost, err)
return nil, err
}

fmt.Printf("File written to: %s\n", outputFile)

// Ensure cleanup
err = ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
if err != nil {
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(err))
}
return nil
return fileContent, nil
}

func getCapturePods(ctx context.Context, kubeClient kubernetes.Interface, captureName, namespace string) (*corev1.PodList, error) {
Expand Down Expand Up @@ -513,6 +545,195 @@ func downloadFromBlob() error {
return nil
}

func downloadAllCaptures(ctx context.Context, config *rest.Config, namespace string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
recommend breaking this down into a couple functions,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if downloadAllNamespaces {
fmt.Println("Downloading all captures from all namespaces...")
} else {
fmt.Printf("Downloading all captures for namespace %s...\n", namespace)
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to initialize k8s client: %w", err)
}

// List all capture jobs with the capture app label
captureJobSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
captureLabels.AppLabel: captureConstants.CaptureAppname,
},
}
labelSelector, err := metav1.LabelSelectorAsSelector(captureJobSelector)
if err != nil {
return fmt.Errorf("failed to parse label selector: %w", err)
}

var jobList *batchv1.JobList
if downloadAllNamespaces {
// Search across all namespaces
jobList, err = kubeClient.BatchV1().Jobs("").List(ctx, metav1.ListOptions{
LabelSelector: labelSelector.String(),
})
if err != nil {
return fmt.Errorf("failed to list capture jobs across all namespaces: %w", err)
}
} else {
// Search in specified namespace only
jobList, err = kubeClient.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector.String(),
})
if err != nil {
return fmt.Errorf("failed to list capture jobs: %w", err)
}
}

if len(jobList.Items) == 0 {
if downloadAllNamespaces {
fmt.Printf("No captures found across all namespaces\n")
} else {
fmt.Printf("No captures found in namespace %s\n", namespace)
}
return nil
}

// Group jobs by capture name and namespace
captureToJobs := make(map[Key][]batchv1.Job)
for i := range jobList.Items {
job := &jobList.Items[i]
captureNameFromLabel, ok := job.Labels[captureLabels.CaptureNameLabel]
if !ok {
continue
}
key := Key{Name: captureNameFromLabel, Namespace: job.Namespace}
captureToJobs[key] = append(captureToJobs[key], *job)
}

fmt.Printf("Found %d capture(s) to download\n", len(captureToJobs))

// Create the final archive using streaming approach to avoid memory issues
timestamp := time.Now().Format(TimestampFormat)
finalArchivePath := filepath.Join(outputPath, fmt.Sprintf("all-captures-%s.tar.gz", timestamp))

fmt.Printf("Creating final archive: %s\n", finalArchivePath)
err = createStreamingTarGzArchive(ctx, finalArchivePath, captureToJobs, kubeClient, config)
if err != nil {
return fmt.Errorf("failed to create final archive: %w", err)
}

fmt.Printf("Successfully created archive: %s\n", finalArchivePath)
return nil
}

// createStreamingTarGzArchive creates a tar.gz archive by streaming files one at a time to avoid memory issues
func createStreamingTarGzArchive(ctx context.Context, outputPath string, captureToJobs map[Key][]batchv1.Job, kubeClient kubernetes.Interface, config *rest.Config) error {
// Create the output file
outFile, err := os.Create(outputPath)
if err != nil {
return fmt.Errorf("failed to create archive file: %w", err)
}
defer outFile.Close()

// Create gzip writer
gzipWriter := gzip.NewWriter(outFile)
defer gzipWriter.Close()

// Create tar writer
tarWriter := tar.NewWriter(gzipWriter)
defer tarWriter.Close()

// We'll create download services per namespace as needed
downloadServices := make(map[string]*DownloadService)
fileCount := 0

// Process each capture and stream files directly to archive
for captureKey := range captureToJobs {
currentCaptureName := captureKey.Name
currentNamespace := captureKey.Namespace
fmt.Printf("Processing capture: %s in namespace: %s\n", currentCaptureName, currentNamespace)

// Get or create download service for this namespace
downloadService, exists := downloadServices[currentNamespace]
if !exists {
downloadService = NewDownloadService(kubeClient, config, currentNamespace)
downloadServices[currentNamespace] = downloadService
}

// Get pods for this capture and download files
pods, podsErr := getCapturePods(ctx, kubeClient, currentCaptureName, currentNamespace)
if podsErr != nil {
fmt.Printf("Warning: Failed to get pods for capture %s in namespace %s: %v\n", currentCaptureName, currentNamespace, podsErr)
continue
}

for i := range pods.Items {
pod := pods.Items[i]
if pod.Status.Phase != corev1.PodSucceeded {
fmt.Printf("Warning: Pod %s is not in Succeeded phase (status: %s), skipping\n", pod.Name, pod.Status.Phase)
continue
}

nodeName := pod.Spec.NodeName
hostPath, ok := pod.Annotations[captureConstants.CaptureHostPathAnnotationKey]
if !ok {
fmt.Printf("Warning: Cannot obtain host path from pod annotations for %s\n", pod.Name)
continue
}
fileName, ok := pod.Annotations[captureConstants.CaptureFilenameAnnotationKey]
if !ok {
fmt.Printf("Warning: Cannot obtain capture file name from pod annotations for %s\n", pod.Name)
continue
}

// Download file content (this is still done in memory per file, but not all files at once)
content, err := downloadService.DownloadFileContent(ctx, nodeName, hostPath, fileName, currentCaptureName)
if err != nil {
fmt.Printf("Warning: Failed to download file from pod %s: %v\n", pod.Name, err)
continue
}

// Determine archive path based on whether we're using all namespaces
var archivePath string
if downloadAllNamespaces {
// Include namespace in path: namespace/captureName/fileName.tar.gz
archivePath = filepath.Join(currentNamespace, currentCaptureName, fileName+".tar.gz")
} else {
// Original path: captureName/fileName.tar.gz
archivePath = filepath.Join(currentCaptureName, fileName+".tar.gz")
}

// Stream file directly to archive
header := &tar.Header{
Name: archivePath,
Mode: 0o600,
Size: int64(len(content)),
}

// Write header
if err := tarWriter.WriteHeader(header); err != nil {
return fmt.Errorf("failed to write header for %s: %w", archivePath, err)
}

// Write content
if _, err := tarWriter.Write(content); err != nil {
return fmt.Errorf("failed to write content for %s: %w", archivePath, err)
}

fileCount++
fmt.Printf("Added %s (%d bytes) to archive\n", archivePath, len(content))
}
}

if fileCount == 0 {
// Remove the empty archive file
outFile.Close()
os.Remove(outputPath)
fmt.Println("No capture files were successfully downloaded")
return nil
}

fmt.Printf("Successfully added %d files to archive\n", fileCount)
return nil
}

func NewDownloadSubCommand() *cobra.Command {
downloadCapture := &cobra.Command{
Use: "download",
Expand All @@ -534,8 +755,13 @@ func NewDownloadSubCommand() *cobra.Command {
captureNamespace = "default"
}

if captureName == "" && blobURL == "" {
return errors.New("either --name or --blob-url must be specified")
if captureName == "" && blobURL == "" && !downloadAll {
return ErrMissingRequiredFlags
}

// Validate all-namespaces flag usage
if downloadAllNamespaces && !downloadAll {
return ErrAllNamespacesRequiresAll
}

if captureName != "" {
Expand All @@ -552,12 +778,21 @@ func NewDownloadSubCommand() *cobra.Command {
}
}

if downloadAll {
err = downloadAllCaptures(ctx, kubeConfig, captureNamespace)
if err != nil {
return err
}
}

return nil
},
}

downloadCapture.Flags().StringVar(&blobURL, "blob-url", "", "Blob URL from which to download")
downloadCapture.Flags().StringVar(&captureName, "name", "", "The name of a the capture")
downloadCapture.Flags().BoolVar(&downloadAll, "all", false, "Download all available captures for the specified namespace (or all namespaces if --all-namespaces flag is set)")
downloadCapture.Flags().BoolVar(&downloadAllNamespaces, "all-namespaces", false, "Download captures from all namespaces (only works with --all flag)")
downloadCapture.Flags().StringVarP(&outputPath, "output", "o", DefaultOutputPath, "Path to save the downloaded capture")

return downloadCapture
Expand Down
Loading
Loading