Skip to content
This repository was archived by the owner on Sep 24, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/google/uuid v1.6.0
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/mock v0.5.0
golang.org/x/sync v0.12.0
google.golang.org/protobuf v1.36.5
Expand Down Expand Up @@ -78,8 +80,6 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/argoproj/gitops-engine/pkg/sync"
"github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)

const (
Expand Down Expand Up @@ -84,7 +85,7 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
return nil, err
}
opts = append(opts, sync.WithSkipHooks(!diffRes.Modified))
syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...)
syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), tracing.NopTracer{}, "", "", opts...)
if err != nil {
return nil, err
}
Expand Down
47 changes: 46 additions & 1 deletion pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -32,6 +33,7 @@ import (
"github.com/argoproj/gitops-engine/pkg/sync/hook"
resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource"
kubeutil "github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)

type reconciledResource struct {
Expand Down Expand Up @@ -209,6 +211,8 @@ func NewSyncContext(
kubectl kubeutil.Kubectl,
namespace string,
openAPISchema openapi.Resources,
syncTracer tracing.Tracer,
syncTraceID, syncTraceRootSpanID string,
opts ...SyncOpt,
) (SyncContext, func(), error) {
dynamicIf, err := dynamic.NewForConfig(restConfig)
Expand Down Expand Up @@ -246,6 +250,9 @@ func NewSyncContext(
permissionValidator: func(_ *unstructured.Unstructured, _ *metav1.APIResource) error {
return nil
},
syncTracer: syncTracer,
syncTraceID: syncTraceID,
syncTraceRootSpanID: syncTraceRootSpanID,
}
for _, opt := range opts {
opt(ctx)
Expand Down Expand Up @@ -357,6 +364,11 @@ type syncContext struct {
// lock to protect concurrent updates of the result list
lock sync.Mutex

// tracer for tracing the sync operation
syncTraceID string
syncTraceRootSpanID string
syncTracer tracing.Tracer

// syncNamespace is a function that will determine if the managed
// namespace should be synced
syncNamespace func(*unstructured.Unstructured, *unstructured.Unstructured) (bool, error)
Expand Down Expand Up @@ -1262,6 +1274,8 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
ss.Go(func(state runState) runState {
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
logCtx.V(1).Info("Pruning")
span := sc.createSpan("pruneObject", dryRun)
defer span.Finish()
result, message := sc.pruneObject(t.liveObj, sc.prune, dryRun)
if result == common.ResultCodeSyncFailed {
state = failed
Expand All @@ -1270,6 +1284,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
sc.setResourceResult(t, result, operationPhases[result], message)
}
sc.setBaggageItemForTasks(&span, t, message, result, operationPhases[result])
return state
})
}
Expand All @@ -1289,19 +1304,27 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
ss.Go(func(state runState) runState {
sc.log.WithValues("dryRun", dryRun, "task", t).V(1).Info("Deleting")
if !dryRun {
span := sc.createSpan("hooksDeletion", dryRun)
defer span.Finish()
err := sc.deleteResource(t)
message := "deleted"
operationPhase := common.OperationRunning
if err != nil {
// it is possible to get a race condition here, such that the resource does not exist when
// delete is requested, we treat this as a nop
if !apierrors.IsNotFound(err) {
state = failed
sc.setResourceResult(t, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
message = fmt.Sprintf("failed to delete resource: %v", err)
operationPhase = common.OperationError
sc.setResourceResult(t, "", operationPhase, message)
}
} else {
// if there is anything that needs deleting, we are at best now in pending and
// want to return and wait for sync to be invoked again
state = pending
operationPhase = common.OperationSucceeded
}
sc.setBaggageItemForTasks(&span, t, message, "", operationPhase)
}
return state
})
Expand Down Expand Up @@ -1330,6 +1353,24 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
return state
}

func (sc *syncContext) createSpan(operation string, dryrun bool) tracing.Span {
// skip tracing if dryrun
if dryrun || sc.syncTracer == nil {
return tracing.NopTracer{}.StartSpan(operation)
}
return sc.syncTracer.StartSpanFromTraceParent(operation, sc.syncTraceID, sc.syncTraceRootSpanID)
}

func (sc *syncContext) setBaggageItemForTasks(span *tracing.Span, t *syncTask, message string, result common.ResultCode, operationPhase common.OperationPhase) {
resourceKey := t.resourceKey()
(*span).SetBaggageItem("resource", resourceKey.String())
(*span).SetBaggageItem("result", string(result))
(*span).SetBaggageItem("operationPhase", string(operationPhase))
(*span).SetBaggageItem("message", message)
(*span).SetBaggageItem("phase", string(t.phase))
(*span).SetBaggageItem("wave", strconv.Itoa(t.wave()))
}

func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRun bool) runState {
ss := newStateSync(state)
for _, task := range tasks {
Expand All @@ -1341,11 +1382,14 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
logCtx.V(1).Info("Applying")
validate := sc.validate && !resourceutil.HasAnnotationOption(t.targetObj, common.AnnotationSyncOptions, common.SyncOptionsDisableValidation)
span := sc.createSpan("applyObject", dryRun)
defer span.Finish()
result, message := sc.applyObject(t, dryRun, validate)
if result == common.ResultCodeSyncFailed {
logCtx.WithValues("message", message).Info("Apply failed")
state = failed
}
var phase common.OperationPhase
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
phase := operationPhases[result]
// no resources are created in dry-run, so running phase means validation was
Expand All @@ -1355,6 +1399,7 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu
}
sc.setResourceResult(t, result, phase, message)
}
sc.setBaggageItemForTasks(&span, t, message, result, phase)
return state
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/utils/tracing/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package tracing

type Tracer interface {
StartSpan(operationName string) Span
StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span
}

type Span interface {
SetBaggageItem(key string, value any)
Finish()
SpanID() string
TraceID() string
}
14 changes: 14 additions & 0 deletions pkg/utils/tracing/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (l LoggingTracer) StartSpan(operationName string) Span {
}
}

// loggingSpan is not a real distributed tracing system.
// so no need to implement real StartSpanFromTraceParent method.
func (l LoggingTracer) StartSpanFromTraceParent(operationName string, _, _ string) Span {
return l.StartSpan(operationName)
}

type loggingSpan struct {
logger logr.Logger
operationName string
Expand All @@ -54,3 +60,11 @@ func baggageToVals(baggage map[string]any) []any {
}
return result
}

func (s loggingSpan) TraceID() string {
return ""
}

func (s loggingSpan) SpanID() string {
return ""
}
12 changes: 12 additions & 0 deletions pkg/utils/tracing/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,22 @@ func (n NopTracer) StartSpan(_ string) Span {
return nopSpan{}
}

func (n NopTracer) StartSpanFromTraceParent(_, _, _ string) Span {
return nopSpan{}
}

type nopSpan struct{}

func (n nopSpan) SetBaggageItem(_ string, _ any) {
}

func (n nopSpan) Finish() {
}

func (n nopSpan) TraceID() string {
return ""
}

func (n nopSpan) SpanID() string {
return ""
}
56 changes: 56 additions & 0 deletions pkg/utils/tracing/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package tracing

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type OpenTelemetryTracer struct {
realTracer trace.Tracer
}

func NewOpenTelemetryTracer(t trace.Tracer) Tracer {
return &OpenTelemetryTracer{
realTracer: t,
}
}

func (t OpenTelemetryTracer) StartSpan(operationName string) Span {
_, realspan := t.realTracer.Start(context.Background(), operationName)
return openTelemetrySpan{realSpan: realspan}
}

func (t OpenTelemetryTracer) StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span {
traceID, _ := trace.TraceIDFromHex(parentTraceId)
parentSpanID, _ := trace.SpanIDFromHex(parentSpanId)
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: parentSpanID,
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
_, realSpan := t.realTracer.Start(ctx, operationName)
return openTelemetrySpan{realSpan: realSpan}
}

type openTelemetrySpan struct {
realSpan trace.Span
}

func (s openTelemetrySpan) SetBaggageItem(key string, value any) {
s.realSpan.SetAttributes(attribute.Key(key).String(value.(string)))
}

func (s openTelemetrySpan) Finish() {
s.realSpan.End()
}

func (s openTelemetrySpan) TraceID() string {
return s.realSpan.SpanContext().TraceID().String()
}

func (s openTelemetrySpan) SpanID() string {
return s.realSpan.SpanContext().SpanID().String()
}