From d0635e427d6434ded050c29e4d2e2c79a7934c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dario=20Casta=C3=B1=C3=A9?= Date: Mon, 1 Sep 2025 14:36:14 +0200 Subject: [PATCH 01/22] feat(tracer): implement msgp serialization for payloadV1 and spanListV1 - Added msgp struct tags to fields in payloadV1 and traceChunk for serialization. - Introduced EncodeMsg methods for payloadV1 and spanListV1 to support msgp encoding. - Updated field types in traceChunk to enhance compatibility with msgp serialization. --- ddtrace/tracer/payload.go | 14 +++++------ ddtrace/tracer/payload_v1.go | 45 ++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 5214d8f447..46088ab9d1 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -155,24 +155,24 @@ func (sp *safePayload) protocol() float64 { // i.e. a chunk of a trace type traceChunk struct { // the sampling priority of the trace - priority int32 + priority int32 `msg:"priority"` // the optional string origin ("lambda", "rum", etc.) of the trace chunk - origin uint32 + origin uint32 `msg:"origin,omitempty"` // a collection of key to value pairs common in all `spans` - attributes map[uint32]anyValue + attributes map[uint32]anyValue `msg:"attributes,omitempty"` // a list of spans in this chunk - spans []Span + spans spanListV1 `msg:"spans,omitempty"` // whether the trace only contains analyzed spans // (not required by tracers and set by the agent) - droppedTrace bool + droppedTrace bool `msg:"droppedTrace"` // the ID of the trace to which all spans in this chunk belong - traceID uint8 + traceID []byte `msg:"traceID"` // the optional string decision maker (previously span tag _dd.p.dm) - decisionMaker uint32 + decisionMaker uint32 `msg:"decisionMaker,omitempty"` } diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 257b7c8d48..e470eb5da6 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -5,6 +5,8 @@ package tracer +import "github.com/tinylib/msgp/msgp" + // payloadV1 is a new version of a msgp payload that can be sent to the agent. // Be aware that payloadV1 follows the same rules and constraints as payloadV04. That is: // @@ -20,41 +22,40 @@ package tracer // Close the request body before attempting to re-use it again! type payloadV1 struct { // array of strings referenced in this tracer payload, its chunks and spans - strings []string + strings []string `msgp:"strings"` // the string ID of the container where the tracer is running - containerID uint32 + containerID uint32 `msgp:"containerID"` // the string language name of the tracer - languageName uint32 + languageName uint32 `msgp:"languageName"` // the string language version of the tracer - languageVersion uint32 + languageVersion uint32 `msgp:"languageVersion"` // the string version of the tracer - tracerVersion uint32 + tracerVersion uint32 `msgp:"tracerVersion"` // the V4 string UUID representation of a tracer session - runtimeID uint32 + runtimeID uint32 `msgp:"runtimeID"` // the optional `env` string tag that set with the tracer - env uint32 + env uint32 `msgp:"env,omitempty"` // the optional string hostname of where the tracer is running - hostname uint32 + hostname uint32 `msgp:"hostname,omitempty"` // the optional string `version` tag for the application set in the tracer - appVersion uint32 + appVersion uint32 `msgp:"appVersion,omitempty"` // a collection of key to value pairs common in all `chunks` - attributes map[uint32]anyValue + attributes map[uint32]anyValue `msgp:"attributes,omitempty"` // a list of trace `chunks` - chunks []traceChunk + chunks []traceChunk `msgp:"chunks,omitempty"` - // fields needed to implement unsafePayload interface + // protocolVersion specifies the trace protocol to use. protocolVersion float64 - itemsCount uint32 } // AnyValue is a representation of the `any` value. It can take the following types: @@ -71,6 +72,13 @@ type anyValue struct { value interface{} } +// EncodeMsg implements msgp.Encodable. +func (a *anyValue) EncodeMsg(*msgp.Writer) error { + panic("unimplemented") +} + +var _ msgp.Encodable = (*anyValue)(nil) + const ( StringValueType = iota + 1 // string or uint BoolValueType // boolean @@ -92,17 +100,20 @@ type keyValue struct { type keyValueList = []keyValue // newPayloadV1 returns a ready to use payloadV1. -func newPayloadV1(protocol float64) *payloadV1 { +func newPayloadV1() *payloadV1 { return &payloadV1{ - protocolVersion: protocol, + protocolVersion: traceProtocolV1, strings: make([]string, 0), attributes: make(map[uint32]anyValue), chunks: make([]traceChunk, 0), } } -func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { - panic("not implemented") +// push pushes a new item into the stream. +func (p *payloadV1) push(t []*Span) error { + // We need to hydrate the payload with everything we get from the spans. + // Conceptually, our `t []*Span` corresponds to one `traceChunk`. + return nil } func (p *payloadV1) grow(n int) { From 478ef228230ef4fa56e2b90ffa9f8628ff88a0dd Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Fri, 5 Sep 2025 15:41:05 -0400 Subject: [PATCH 02/22] added missing spanlistv1 type --- ddtrace/tracer/payload.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 46088ab9d1..f8e5285be1 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -63,6 +63,8 @@ const ( msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes ) +type spanListV1 []*Span + // safePayload provides a thread-safe wrapper around payload. type safePayload struct { mu sync.RWMutex From 230633af28da24d46b786be7fd820142fc2a7c14 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Fri, 5 Sep 2025 15:42:22 -0400 Subject: [PATCH 03/22] finish cherry picking dario's PR --- ddtrace/tracer/payload.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index f8e5285be1..2f3f5f0995 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -8,6 +8,8 @@ package tracer import ( "io" "sync" + + "github.com/tinylib/msgp/msgp" ) // payloadStats contains the statistics of a payload. @@ -65,6 +67,17 @@ const ( type spanListV1 []*Span +// EncodeMsg implements msgp.Encodable. +func (s *spanListV1) EncodeMsg(*msgp.Writer) error { + // From here, encoding goes full manual. + // Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. + // For v1 we need to manually encode the spans, span links, and span events + // if we don't want to do extra allocations. + panic("unimplemented") +} + +var _ msgp.Encodable = (*spanListV1)(nil) + // safePayload provides a thread-safe wrapper around payload. type safePayload struct { mu sync.RWMutex From c0f2493092e2a24ae5ef8ddb6b5193fc7fe84fcc Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Fri, 5 Sep 2025 15:58:39 -0400 Subject: [PATCH 04/22] fix newPayload and spanlist type --- ddtrace/tracer/payload.go | 12 +++++++----- ddtrace/tracer/payload_v1.go | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 2f3f5f0995..1e3d56369e 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -48,11 +48,13 @@ type payload interface { payloadReader } -// newpayload returns a ready to use unsafe payload. +// newPayload returns a ready to use payload. func newPayload(protocol float64) payload { - // TODO(hannahkm): add support for v1 protocol - // if protocol == traceProtocolV1 { - // } + if protocol == traceProtocolV1 { + return &safePayload{ + p: newPayloadV1(), + } + } return &safePayload{ p: newPayloadV04(), } @@ -65,7 +67,7 @@ const ( msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes ) -type spanListV1 []*Span +type spanListV1 = spanList // EncodeMsg implements msgp.Encodable. func (s *spanListV1) EncodeMsg(*msgp.Writer) error { diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index e470eb5da6..b9624e459b 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -110,10 +110,10 @@ func newPayloadV1() *payloadV1 { } // push pushes a new item into the stream. -func (p *payloadV1) push(t []*Span) error { +func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. // Conceptually, our `t []*Span` corresponds to one `traceChunk`. - return nil + return payloadStats{}, nil } func (p *payloadV1) grow(n int) { From 1e941acb134cdbad3d343bb84f464fbb2b41f514 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Thu, 11 Sep 2025 17:55:03 -0400 Subject: [PATCH 05/22] wip: encode anyvalue and keyvalue --- ddtrace/tracer/payload_v1.go | 128 +++++++++++++++++++++++++++++++---- 1 file changed, 116 insertions(+), 12 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index b9624e459b..fff2a627ca 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -5,7 +5,13 @@ package tracer -import "github.com/tinylib/msgp/msgp" +import ( + "bytes" + "fmt" + "sync" + + "github.com/tinylib/msgp/msgp" +) // payloadV1 is a new version of a msgp payload that can be sent to the agent. // Be aware that payloadV1 follows the same rules and constraints as payloadV04. That is: @@ -22,7 +28,9 @@ import "github.com/tinylib/msgp/msgp" // Close the request body before attempting to re-use it again! type payloadV1 struct { // array of strings referenced in this tracer payload, its chunks and spans - strings []string `msgp:"strings"` + // stringTable holds references from a string value to an index. + // the 0th position in the stringTable should always be the empty string. + strings stringTable `msgp:"strings"` // the string ID of the container where the tracer is running containerID uint32 `msgp:"containerID"` @@ -56,6 +64,19 @@ type payloadV1 struct { // protocolVersion specifies the trace protocol to use. protocolVersion float64 + + // buf holds the sequence of msgpack-encoded items. + buf bytes.Buffer + + // reader is used for reading the contents of buf. + reader *bytes.Reader +} + +type stringTable struct { + m sync.Mutex + strings []string // list of strings + indices map[string]uint32 // map strings to their indices + nextIndex uint32 // last index of the stringTable } // AnyValue is a representation of the `any` value. It can take the following types: @@ -72,11 +93,6 @@ type anyValue struct { value interface{} } -// EncodeMsg implements msgp.Encodable. -func (a *anyValue) EncodeMsg(*msgp.Writer) error { - panic("unimplemented") -} - var _ msgp.Encodable = (*anyValue)(nil) const ( @@ -89,7 +105,7 @@ const ( keyValueListType // []keyValue ) -type arrayValue = []anyValue +type arrayValue []anyValue // keyValue is made up of the key and an AnyValue (the type of the value and the value itself) type keyValue struct { @@ -97,15 +113,21 @@ type keyValue struct { value anyValue } -type keyValueList = []keyValue +type keyValueList []keyValue + +var _ msgp.Encodable = (*keyValue)(nil) // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { return &payloadV1{ protocolVersion: traceProtocolV1, - strings: make([]string, 0), attributes: make(map[uint32]anyValue), chunks: make([]traceChunk, 0), + strings: stringTable{ + strings: []string{""}, + indices: map[string]uint32{"": 0}, + nextIndex: 1, + }, } } @@ -113,7 +135,14 @@ func newPayloadV1() *payloadV1 { func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. // Conceptually, our `t []*Span` corresponds to one `traceChunk`. - return payloadStats{}, nil + p.chunks = append(p.chunks, traceChunk{ + spans: t, + }) + if err := msgp.Encode(&p.buf, t); err != nil { // TODO(hannahkm): this needs to call (spanListV1).EncodeMsg + return payloadStats{}, err + } + p.recordItem() + return p.stats(), nil } func (p *payloadV1) grow(n int) { @@ -129,6 +158,7 @@ func (p *payloadV1) clear() { } func (p *payloadV1) recordItem() { + // atomic.AddUint32(&p.count, 1) panic("not implemented") } @@ -145,7 +175,7 @@ func (p *payloadV1) itemCount() int { } func (p *payloadV1) protocol() float64 { - panic("not implemented") + return p.protocolVersion } // Close implements io.Closer @@ -162,3 +192,77 @@ func (p *payloadV1) Write(data []byte) (n int, err error) { func (p *payloadV1) Read(b []byte) (n int, err error) { panic("not implemented") } + +// Encode the anyValue +// EncodeMsg implements msgp.Encodable. +func (a *anyValue) EncodeMsg(e *msgp.Writer) error { + switch a.valueType { + case StringValueType: + e.WriteInt32(StringValueType) + return encodeString(a.value.(string), e) + case BoolValueType: + e.WriteInt32(BoolValueType) + return e.WriteBool(a.value.(bool)) + case FloatValueType: + e.WriteInt32(FloatValueType) + return e.WriteFloat64(a.value.(float64)) + case IntValueType: + e.WriteInt32(IntValueType) + return e.WriteUint64(a.value.(uint64)) + case BytesValueType: + e.WriteInt32(BytesValueType) + return e.WriteBytes(a.value.([]byte)) + default: + return fmt.Errorf("invalid value type: %d", a.valueType) + } +} + +// EncodeMsg implements msgp.Encodable. +func (av arrayValue) EncodeMsg(e *msgp.Writer) error { + err := e.WriteArrayHeader(uint32(len(av))) + if err != nil { + return err + } + for _, value := range av { + if err := value.EncodeMsg(e); err != nil { + return err + } + } + return nil +} + +// EncodeMsg implements msgp.Encodable. +func (k keyValue) EncodeMsg(e *msgp.Writer) error { + err := e.WriteUint32(k.key) + if err != nil { + return err + } + err = k.value.EncodeMsg(e) + if err != nil { + return err + } + return nil +} + +func encodeKeyValueList(kv keyValueList, e *msgp.Writer) error { + err := e.WriteMapHeader(uint32(len(kv))) + if err != nil { + return err + } + for _, k := range kv { + if err := k.value.EncodeMsg(e); err != nil { + return err + } + } + return nil +} + +// When writing a string: +// - use its index in the string table if it exists +// - otherwise, write the string into the message, then add the string at the next index +// When reading a string, check that it is a uint and then: +// - if true, check read up the index position and return that position +// - else, add it to thenext index position and return that position +func encodeString(s string, e *msgp.Writer) error { + panic("not implemented") +} From dc00fd3d5e164bfe9b9006f1065ea85272d27203 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Thu, 11 Sep 2025 18:01:53 -0400 Subject: [PATCH 06/22] move spanLinkV1 into payload_v1 --- ddtrace/tracer/payload.go | 15 -------------- ddtrace/tracer/payload_v1.go | 39 ++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 1e3d56369e..998fc13b6c 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -8,8 +8,6 @@ package tracer import ( "io" "sync" - - "github.com/tinylib/msgp/msgp" ) // payloadStats contains the statistics of a payload. @@ -67,19 +65,6 @@ const ( msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes ) -type spanListV1 = spanList - -// EncodeMsg implements msgp.Encodable. -func (s *spanListV1) EncodeMsg(*msgp.Writer) error { - // From here, encoding goes full manual. - // Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. - // For v1 we need to manually encode the spans, span links, and span events - // if we don't want to do extra allocations. - panic("unimplemented") -} - -var _ msgp.Encodable = (*spanListV1)(nil) - // safePayload provides a thread-safe wrapper around payload. type safePayload struct { mu sync.RWMutex diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index fff2a627ca..c05731d20d 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -193,6 +193,10 @@ func (p *payloadV1) Read(b []byte) (n int, err error) { panic("not implemented") } +type spanListV1 spanList + +var _ msgp.Encodable = (*spanListV1)(nil) + // Encode the anyValue // EncodeMsg implements msgp.Encodable. func (a *anyValue) EncodeMsg(e *msgp.Writer) error { @@ -257,6 +261,41 @@ func encodeKeyValueList(kv keyValueList, e *msgp.Writer) error { return nil } +// EncodeMsg writes the contents of the TraceChunk into `p.buf` +// Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. +// For v1 we need to manually encode the spans, span links, and span events +// if we don't want to do extra allocations. +// EncodeMsg implements msgp.Encodable. +func (s spanListV1) EncodeMsg(e *msgp.Writer) error { + err := e.WriteArrayHeader(uint32(len(s))) + if err != nil { + return msgp.WrapError(err) + } + + e.WriteInt32(4) + for _, span := range s { + if span == nil { + err := e.WriteNil() + if err != nil { + return err + } + } else { + err := encodeSpan(span, e) + if err != nil { + return msgp.WrapError(err, span) + } + } + } + + return nil +} + +// Custom encoding for spans under the v1 trace protocol. +func encodeSpan(s *Span, e *msgp.Writer) error { + panic("not implemented") +} + +// encodeString and decodeString handles encoding a string to the payload's string table. // When writing a string: // - use its index in the string table if it exists // - otherwise, write the string into the message, then add the string at the next index From 98d30ceef9a3c90f0e7aae73be27ff9eb84fcdc2 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Fri, 12 Sep 2025 17:21:42 -0400 Subject: [PATCH 07/22] wip: encode spans --- ddtrace/tracer/payload_v1.go | 115 +++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 6 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index c05731d20d..9991fa1b6a 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -131,6 +131,13 @@ func newPayloadV1() *payloadV1 { } } +var _ msgp.Encodable = (*payloadV1)(nil) + +// EncodeMsg implements msgp.Encodable. +func (p *payloadV1) EncodeMsg(e *msgp.Writer) error { + panic("not implemented") +} + // push pushes a new item into the stream. func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. @@ -203,7 +210,11 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer) error { switch a.valueType { case StringValueType: e.WriteInt32(StringValueType) - return encodeString(a.value.(string), e) + v, err := encodeString(a.value.(string)) + if err != nil { + return err + } + return e.WriteUint32(v) case BoolValueType: e.WriteInt32(BoolValueType) return e.WriteBool(a.value.(bool)) @@ -216,6 +227,9 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer) error { case BytesValueType: e.WriteInt32(BytesValueType) return e.WriteBytes(a.value.([]byte)) + case ArrayValueType: + e.WriteInt32(ArrayValueType) + return a.value.(arrayValue).EncodeMsg(e) default: return fmt.Errorf("invalid value type: %d", a.valueType) } @@ -253,8 +267,13 @@ func encodeKeyValueList(kv keyValueList, e *msgp.Writer) error { if err != nil { return err } - for _, k := range kv { - if err := k.value.EncodeMsg(e); err != nil { + for i, k := range kv { + err := e.WriteUint32(uint32(i)) + if err != nil { + return err + } + err = k.EncodeMsg(e) + if err != nil { return err } } @@ -291,17 +310,101 @@ func (s spanListV1) EncodeMsg(e *msgp.Writer) error { } // Custom encoding for spans under the v1 trace protocol. +// The encoding of attributes is the combination of the meta, metrics, and metaStruct fields of the v0.4 protocol. func encodeSpan(s *Span, e *msgp.Writer) error { - panic("not implemented") + kv := keyValueList{ + {key: 1, value: anyValue{valueType: StringValueType, value: s.service}}, // service + {key: 2, value: anyValue{valueType: StringValueType, value: s.name}}, // name + {key: 3, value: anyValue{valueType: StringValueType, value: s.resource}}, // resource + {key: 4, value: anyValue{valueType: IntValueType, value: s.spanID}}, // spanID + {key: 5, value: anyValue{valueType: IntValueType, value: s.parentID}}, // parentID + {key: 6, value: anyValue{valueType: IntValueType, value: s.start}}, // start + {key: 7, value: anyValue{valueType: IntValueType, value: s.duration}}, // duration + {key: 8, value: anyValue{valueType: BoolValueType, value: s.error}}, // error + {key: 10, value: anyValue{valueType: StringValueType, value: s.spanType}}, // type + {key: 11, value: anyValue{valueType: keyValueListType, value: s.spanLinks}}, // SpanLink + {key: 12, value: anyValue{valueType: keyValueListType, value: s.spanEvents}}, // SpanEvent + {key: 15, value: anyValue{valueType: StringValueType, value: s.integration}}, // component + } + + // encode meta attributes + attr := keyValueList{} + for k, v := range s.meta { + idx, err := encodeString(k) + if err != nil { + // print something here + } + attr = append(attr, keyValue{key: idx, value: anyValue{valueType: StringValueType, value: v}}) + } + + // encode metric attributes + for k, v := range s.metrics { + idx, err := encodeString(k) + if err != nil { + // print something here + } + attr = append(attr, keyValue{key: idx, value: anyValue{valueType: FloatValueType, value: v}}) + } + + // encode metaStruct attributes + for k, v := range s.metaStruct { + idx, err := encodeString(k) + if err != nil { + // print something here + } + attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) + } + + kv = append(kv, keyValue{key: 9, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + + env, ok := s.meta["env"] + if ok { + kv = append(kv, keyValue{key: 13, value: anyValue{valueType: StringValueType, value: env}}) // env + } + version, ok := s.meta["version"] + if ok { + kv = append(kv, keyValue{key: 14, value: anyValue{valueType: StringValueType, value: version}}) // version + } + + return encodeKeyValueList(kv, e) } // encodeString and decodeString handles encoding a string to the payload's string table. // When writing a string: // - use its index in the string table if it exists // - otherwise, write the string into the message, then add the string at the next index +// Returns the index of the string in the string table, and an error if there is one +func encodeString(s string) (uint32, error) { + panic("not implemented") +} + // When reading a string, check that it is a uint and then: // - if true, check read up the index position and return that position -// - else, add it to thenext index position and return that position -func encodeString(s string, e *msgp.Writer) error { +// - else, add it to the next index position and return that position +func decodeString(i uint32, e *msgp.Writer) (string, error) { panic("not implemented") } + +func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { + panic("not implemented") +} + +func encodeSpanEvents(se []spanEvent, e *msgp.Writer) error { + panic("not implemented") +} + +func getAnyValueType(v any) int { + switch v.(type) { + case string: + return StringValueType + case bool: + return BoolValueType + case float64: + return FloatValueType + case float32: + return FloatValueType + case []byte: + return BytesValueType + } + return IntValueType +} From 0ad7dd12194223abc6b96c1d048b7e44adbadfd7 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Mon, 15 Sep 2025 14:36:49 -0400 Subject: [PATCH 08/22] wip: span event and span link --- ddtrace/tracer/payload_v1.go | 90 ++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 9 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 9991fa1b6a..e7f92331c9 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -96,13 +96,13 @@ type anyValue struct { var _ msgp.Encodable = (*anyValue)(nil) const ( - StringValueType = iota + 1 // string or uint - BoolValueType // boolean - FloatValueType // float64 - IntValueType // uint64 - BytesValueType // []uint8 - ArrayValueType // []AnyValue - keyValueListType // []keyValue + StringValueType = iota + 1 // string or uint -- 1 + BoolValueType // boolean -- 2 + FloatValueType // float64 -- 3 + IntValueType // uint64 -- 4 + BytesValueType // []uint8 -- 5 + ArrayValueType // []AnyValue -- 6 + keyValueListType // []keyValue -- 7 ) type arrayValue []anyValue @@ -385,12 +385,84 @@ func decodeString(i uint32, e *msgp.Writer) (string, error) { panic("not implemented") } +// encodeSpanLinks encodes the span links into a msgp.Writer +// Span links are represented as an array of fixmaps (keyValueList) func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { - panic("not implemented") + // write the number of span links + err := e.WriteArrayHeader(uint32(len(sl))) + if err != nil { + return err + } + + // represent each span link as a fixmap (keyValueList) and add it to an array + kv := arrayValue{} + for _, s := range sl { + slKeyValues := keyValueList{ + {key: 1, value: anyValue{valueType: IntValueType, value: s.TraceID}}, // traceID + {key: 2, value: anyValue{valueType: IntValueType, value: s.SpanID}}, // spanID + {key: 4, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate + {key: 5, value: anyValue{valueType: IntValueType, value: s.Flags}}, // flags + } + + attr := keyValueList{} + // attributes + for k, v := range s.Attributes { + idx, err := encodeString(k) + if err != nil { + return err + } + attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) + } + slKeyValues = append(slKeyValues, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) + } + + for _, v := range kv { + err := v.EncodeMsg(e) + if err != nil { + return err + } + } + return nil } +// encodeSpanEvents encodes the span events into a msgp.Writer +// Span events are represented as an array of fixmaps (keyValueList) func encodeSpanEvents(se []spanEvent, e *msgp.Writer) error { - panic("not implemented") + // write the number of span events + err := e.WriteArrayHeader(uint32(len(se))) + if err != nil { + return err + } + + // represent each span event as a fixmap (keyValueList) and add it to an array + kv := arrayValue{} + for _, s := range se { + slKeyValues := keyValueList{ + {key: 1, value: anyValue{valueType: IntValueType, value: s.TimeUnixNano}}, // time + {key: 2, value: anyValue{valueType: StringValueType, value: s.Name}}, // name + } + + attr := keyValueList{} + // attributes + for k, v := range s.Attributes { + idx, err := encodeString(k) + if err != nil { + return err + } + attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) + } + slKeyValues = append(slKeyValues, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) + } + + for _, v := range kv { + err := v.EncodeMsg(e) + if err != nil { + return err + } + } + return nil } func getAnyValueType(v any) int { From 326a24442d1986325d16e71f5df053e93f76ab16 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Mon, 15 Sep 2025 15:05:24 -0400 Subject: [PATCH 09/22] wip: payload --- ddtrace/tracer/payload_v1.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index e7f92331c9..515b2d1580 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -116,6 +116,7 @@ type keyValue struct { type keyValueList []keyValue var _ msgp.Encodable = (*keyValue)(nil) +var _ msgp.Encodable = (keyValueList)(nil) // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { @@ -135,7 +136,19 @@ var _ msgp.Encodable = (*payloadV1)(nil) // EncodeMsg implements msgp.Encodable. func (p *payloadV1) EncodeMsg(e *msgp.Writer) error { - panic("not implemented") + kv := keyValueList{ + {key: 2, value: anyValue{valueType: IntValueType, value: p.containerID}}, // containerID + {key: 3, value: anyValue{valueType: IntValueType, value: p.languageName}}, // languageName + {key: 4, value: anyValue{valueType: IntValueType, value: p.languageVersion}}, // languageVersion + {key: 5, value: anyValue{valueType: IntValueType, value: p.tracerVersion}}, // tracerVersion + {key: 6, value: anyValue{valueType: IntValueType, value: p.runtimeID}}, // runtimeID + {key: 7, value: anyValue{valueType: StringValueType, value: p.env}}, // env + {key: 8, value: anyValue{valueType: StringValueType, value: p.hostname}}, // hostname + {key: 9, value: anyValue{valueType: StringValueType, value: p.appVersion}}, // appVersion + {key: 10, value: anyValue{valueType: keyValueListType, value: p.attributes}}, // attributes + {key: 11, value: anyValue{valueType: keyValueListType, value: p.chunks}}, // chunks + } + return kv.EncodeMsg(e) } // push pushes a new item into the stream. @@ -230,6 +243,9 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer) error { case ArrayValueType: e.WriteInt32(ArrayValueType) return a.value.(arrayValue).EncodeMsg(e) + case keyValueListType: + e.WriteInt32(keyValueListType) + return a.value.(keyValueList).EncodeMsg(e) default: return fmt.Errorf("invalid value type: %d", a.valueType) } @@ -262,7 +278,7 @@ func (k keyValue) EncodeMsg(e *msgp.Writer) error { return nil } -func encodeKeyValueList(kv keyValueList, e *msgp.Writer) error { +func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { err := e.WriteMapHeader(uint32(len(kv))) if err != nil { return err @@ -366,7 +382,7 @@ func encodeSpan(s *Span, e *msgp.Writer) error { kv = append(kv, keyValue{key: 14, value: anyValue{valueType: StringValueType, value: version}}) // version } - return encodeKeyValueList(kv, e) + return kv.EncodeMsg(e) } // encodeString and decodeString handles encoding a string to the payload's string table. From ea80c7a47e18841dacbadd3b12933043e4b73fab Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Tue, 16 Sep 2025 11:22:08 -0400 Subject: [PATCH 10/22] wip: encode traceChunk --- ddtrace/tracer/payload_v1.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 515b2d1580..08703c7fb9 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -117,6 +117,7 @@ type keyValueList []keyValue var _ msgp.Encodable = (*keyValue)(nil) var _ msgp.Encodable = (keyValueList)(nil) +var _ msgp.Encodable = (*traceChunk)(nil) // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { @@ -158,7 +159,7 @@ func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { p.chunks = append(p.chunks, traceChunk{ spans: t, }) - if err := msgp.Encode(&p.buf, t); err != nil { // TODO(hannahkm): this needs to call (spanListV1).EncodeMsg + if err := t.EncodeMsg(&p.buf); err != nil { // TODO(hannahkm): this needs to call (spanListV1).EncodeMsg return payloadStats{}, err } p.recordItem() @@ -296,7 +297,26 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { return nil } -// EncodeMsg writes the contents of the TraceChunk into `p.buf` +func (t *traceChunk) EncodeMsg(e *msgp.Writer) error { + kv := keyValueList{ + {key: 1, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority + {key: 2, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin + {key: 4, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans + {key: 5, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace + {key: 6, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID + {key: 7, value: anyValue{valueType: IntValueType, value: t.decisionMaker}}, // samplingMechanism + } + + attr := keyValueList{} + for k, v := range t.attributes { + attr = append(attr, keyValue{key: k, value: anyValue{valueType: getAnyValueType(v), value: v}}) + } + kv = append(kv, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + + return kv.EncodeMsg(e) +} + +// EncodeMsg writes the contents of a list of spans into `p.buf` // Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. // For v1 we need to manually encode the spans, span links, and span events // if we don't want to do extra allocations. From af26f1501b3d2aa5bc237cf2ea4b19ca8177adb3 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Wed, 17 Sep 2025 14:21:06 -0400 Subject: [PATCH 11/22] fix payload encodings --- ddtrace/tracer/payload_v1.go | 108 +++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 37 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 08703c7fb9..97106542b9 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -7,8 +7,10 @@ package tracer import ( "bytes" + "encoding/binary" "fmt" "sync" + "sync/atomic" "github.com/tinylib/msgp/msgp" ) @@ -65,6 +67,17 @@ type payloadV1 struct { // protocolVersion specifies the trace protocol to use. protocolVersion float64 + // header specifies the first few bytes in the msgpack stream + // indicating the type of array (fixarray, array16 or array32) + // and the number of items contained in the stream. + header []byte + + // off specifies the current read position on the header. + off int + + // count specifies the number of items in the stream. + count uint32 + // buf holds the sequence of msgpack-encoded items. buf bytes.Buffer @@ -133,33 +146,30 @@ func newPayloadV1() *payloadV1 { } } -var _ msgp.Encodable = (*payloadV1)(nil) - -// EncodeMsg implements msgp.Encodable. -func (p *payloadV1) EncodeMsg(e *msgp.Writer) error { - kv := keyValueList{ - {key: 2, value: anyValue{valueType: IntValueType, value: p.containerID}}, // containerID - {key: 3, value: anyValue{valueType: IntValueType, value: p.languageName}}, // languageName - {key: 4, value: anyValue{valueType: IntValueType, value: p.languageVersion}}, // languageVersion - {key: 5, value: anyValue{valueType: IntValueType, value: p.tracerVersion}}, // tracerVersion - {key: 6, value: anyValue{valueType: IntValueType, value: p.runtimeID}}, // runtimeID - {key: 7, value: anyValue{valueType: StringValueType, value: p.env}}, // env - {key: 8, value: anyValue{valueType: StringValueType, value: p.hostname}}, // hostname - {key: 9, value: anyValue{valueType: StringValueType, value: p.appVersion}}, // appVersion - {key: 10, value: anyValue{valueType: keyValueListType, value: p.attributes}}, // attributes - {key: 11, value: anyValue{valueType: keyValueListType, value: p.chunks}}, // chunks - } - return kv.EncodeMsg(e) -} - // push pushes a new item into the stream. func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. // Conceptually, our `t []*Span` corresponds to one `traceChunk`. + origin, priority := "", 0 + for _, span := range t { + if span == nil { + continue + } + if p, ok := span.Context().SamplingPriority(); ok { + origin = span.Context().origin + priority = p + break + } + } + p.chunks = append(p.chunks, traceChunk{ - spans: t, + priority: int32(priority), + origin: origin, + attributes: make(map[uint32]anyValue), + spans: t, + traceID: t[0].Context().traceID, }) - if err := t.EncodeMsg(&p.buf); err != nil { // TODO(hannahkm): this needs to call (spanListV1).EncodeMsg + if err := msgp.Encode(&p.buf, t); err != nil { return payloadStats{}, err } p.recordItem() @@ -167,46 +177,70 @@ func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { } func (p *payloadV1) grow(n int) { - panic("not implemented") + p.buf.Grow(n) } func (p *payloadV1) reset() { - panic("not implemented") + p.updateHeader() + if p.reader != nil { + p.reader.Seek(0, 0) + } } func (p *payloadV1) clear() { - panic("not implemented") + p.buf = bytes.Buffer{} + p.reader = nil } func (p *payloadV1) recordItem() { - // atomic.AddUint32(&p.count, 1) - panic("not implemented") + atomic.AddUint32(&p.count, 1) + p.updateHeader() } func (p *payloadV1) stats() payloadStats { - panic("not implemented") + return payloadStats{ + size: p.size(), + itemCount: p.itemCount(), + } } func (p *payloadV1) size() int { - panic("not implemented") + return p.buf.Len() + len(p.header) - p.off } func (p *payloadV1) itemCount() int { - panic("not implemented") + return int(atomic.LoadUint32(&p.count)) } func (p *payloadV1) protocol() float64 { return p.protocolVersion } +func (p *payloadV1) updateHeader() { + n := uint64(atomic.LoadUint32(&p.count)) + switch { + case n <= 15: + p.header[7] = msgpackArrayFix + byte(n) + p.off = 7 + case n <= 1<<16-1: + binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes + p.header[5] = msgpackArray16 + p.off = 5 + default: // n <= 1<<32-1 + binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes + p.header[3] = msgpackArray32 + p.off = 3 + } +} + // Close implements io.Closer func (p *payloadV1) Close() error { - panic("not implemented") + return nil } // Write implements io.Writer. It writes data directly to the buffer. func (p *payloadV1) Write(data []byte) (n int, err error) { - panic("not implemented") + return p.buf.Write(data) } // Read implements io.Reader. It reads from the msgpack-encoded stream. @@ -299,12 +333,12 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { func (t *traceChunk) EncodeMsg(e *msgp.Writer) error { kv := keyValueList{ - {key: 1, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority - {key: 2, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin - {key: 4, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans - {key: 5, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace - {key: 6, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID - {key: 7, value: anyValue{valueType: IntValueType, value: t.decisionMaker}}, // samplingMechanism + {key: 1, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority + {key: 2, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin + {key: 4, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans + {key: 5, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace + {key: 6, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID + {key: 7, value: anyValue{valueType: IntValueType, value: t.samplingMechanism}}, // samplingMechanism } attr := keyValueList{} From ded707db27288e539c9f1a32742f033220f25c82 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Wed, 17 Sep 2025 15:18:28 -0400 Subject: [PATCH 12/22] fix traceChunk field types --- ddtrace/tracer/payload.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 998fc13b6c..7fd8c402b5 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -160,7 +160,7 @@ type traceChunk struct { priority int32 `msg:"priority"` // the optional string origin ("lambda", "rum", etc.) of the trace chunk - origin uint32 `msg:"origin,omitempty"` + origin string `msg:"origin,omitempty"` // a collection of key to value pairs common in all `spans` attributes map[uint32]anyValue `msg:"attributes,omitempty"` @@ -173,8 +173,8 @@ type traceChunk struct { droppedTrace bool `msg:"droppedTrace"` // the ID of the trace to which all spans in this chunk belong - traceID []byte `msg:"traceID"` + traceID [16]byte `msg:"traceID"` // the optional string decision maker (previously span tag _dd.p.dm) - decisionMaker uint32 `msg:"decisionMaker,omitempty"` + samplingMechanism string `msg:"samplingMechanism,omitempty"` } From 5124410174cce24fa206f120a157a5834c4aae08 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Thu, 18 Sep 2025 15:40:53 -0400 Subject: [PATCH 13/22] broken string table implementation --- ddtrace/tracer/payload_v1.go | 40 +++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 97106542b9..e1d386e8cd 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -92,6 +92,11 @@ type stringTable struct { nextIndex uint32 // last index of the stringTable } +type payloadWrapper struct { + *msgp.Writer + payload *payloadV1 +} + // AnyValue is a representation of the `any` value. It can take the following types: // - uint32 // - bool @@ -260,7 +265,7 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer) error { e.WriteInt32(StringValueType) v, err := encodeString(a.value.(string)) if err != nil { - return err + return e.WriteString(a.value.(string)) } return e.WriteUint32(v) case BoolValueType: @@ -402,7 +407,7 @@ func encodeSpan(s *Span, e *msgp.Writer) error { for k, v := range s.meta { idx, err := encodeString(k) if err != nil { - // print something here + idx = k } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: StringValueType, value: v}}) } @@ -411,7 +416,7 @@ func encodeSpan(s *Span, e *msgp.Writer) error { for k, v := range s.metrics { idx, err := encodeString(k) if err != nil { - // print something here + idx = k } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: FloatValueType, value: v}}) } @@ -420,7 +425,7 @@ func encodeSpan(s *Span, e *msgp.Writer) error { for k, v := range s.metaStruct { idx, err := encodeString(k) if err != nil { - // print something here + idx = k } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } @@ -444,15 +449,22 @@ func encodeSpan(s *Span, e *msgp.Writer) error { // - use its index in the string table if it exists // - otherwise, write the string into the message, then add the string at the next index // Returns the index of the string in the string table, and an error if there is one -func encodeString(s string) (uint32, error) { - panic("not implemented") -} +func (p *payloadV1) encodeString(s string) (uint32, error) { + sTable := &p.strings + sTable.m.Lock() + defer sTable.m.Unlock() + idx, ok := sTable.indices[s] + // if the string already exists in the table, use its index + if ok { + return idx, nil + } -// When reading a string, check that it is a uint and then: -// - if true, check read up the index position and return that position -// - else, add it to the next index position and return that position -func decodeString(i uint32, e *msgp.Writer) (string, error) { - panic("not implemented") + // else, write the string into the table at the next index + // return an error to indicate that the string should be written to the msgp message + sTable.indices[s] = sTable.nextIndex + sTable.strings = append(sTable.strings, s) + sTable.nextIndex += 1 + return sTable.nextIndex, fmt.Errorf("string not found in table") } // encodeSpanLinks encodes the span links into a msgp.Writer @@ -479,7 +491,7 @@ func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { for k, v := range s.Attributes { idx, err := encodeString(k) if err != nil { - return err + idx = k } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } @@ -518,7 +530,7 @@ func encodeSpanEvents(se []spanEvent, e *msgp.Writer) error { for k, v := range s.Attributes { idx, err := encodeString(k) if err != nil { - return err + idx = k } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } From bb1baaf1c04d6d57e9715e77fd77991b2635541b Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Mon, 22 Sep 2025 14:36:22 -0400 Subject: [PATCH 14/22] clean up some things --- ddtrace/tracer/payload_v1.go | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index e1d386e8cd..4981a3d9d9 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -9,7 +9,6 @@ import ( "bytes" "encoding/binary" "fmt" - "sync" "sync/atomic" "github.com/tinylib/msgp/msgp" @@ -86,17 +85,11 @@ type payloadV1 struct { } type stringTable struct { - m sync.Mutex strings []string // list of strings indices map[string]uint32 // map strings to their indices nextIndex uint32 // last index of the stringTable } -type payloadWrapper struct { - *msgp.Writer - payload *payloadV1 -} - // AnyValue is a representation of the `any` value. It can take the following types: // - uint32 // - bool @@ -111,8 +104,6 @@ type anyValue struct { value interface{} } -var _ msgp.Encodable = (*anyValue)(nil) - const ( StringValueType = iota + 1 // string or uint -- 1 BoolValueType // boolean -- 2 @@ -133,6 +124,10 @@ type keyValue struct { type keyValueList []keyValue +type spanListV1 spanList + +var _ msgp.Encodable = (*anyValue)(nil) +var _ msgp.Encodable = (*spanListV1)(nil) var _ msgp.Encodable = (*keyValue)(nil) var _ msgp.Encodable = (keyValueList)(nil) var _ msgp.Encodable = (*traceChunk)(nil) @@ -253,10 +248,6 @@ func (p *payloadV1) Read(b []byte) (n int, err error) { panic("not implemented") } -type spanListV1 spanList - -var _ msgp.Encodable = (*spanListV1)(nil) - // Encode the anyValue // EncodeMsg implements msgp.Encodable. func (a *anyValue) EncodeMsg(e *msgp.Writer) error { @@ -451,8 +442,6 @@ func encodeSpan(s *Span, e *msgp.Writer) error { // Returns the index of the string in the string table, and an error if there is one func (p *payloadV1) encodeString(s string) (uint32, error) { sTable := &p.strings - sTable.m.Lock() - defer sTable.m.Unlock() idx, ok := sTable.indices[s] // if the string already exists in the table, use its index if ok { From f071e05f2360d16845eb4f8e26f3fa7b676c3f23 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Tue, 23 Sep 2025 14:22:21 -0400 Subject: [PATCH 15/22] streamingKey type for stringTable (oh god) --- ddtrace/tracer/payload_v1.go | 170 +++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 4981a3d9d9..894afb4fd0 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -116,9 +116,18 @@ const ( type arrayValue []anyValue +// keys in a keyValue can either be a string or a uint32 index +// isString is true when the key is a string value, and false when the key is a uint32 index +type streamingKey struct { + isString bool + stringValue string + idx uint32 +} + // keyValue is made up of the key and an AnyValue (the type of the value and the value itself) +// The key is either a uint32 index into the string table or a string value. type keyValue struct { - key uint32 + key streamingKey value anyValue } @@ -126,12 +135,6 @@ type keyValueList []keyValue type spanListV1 spanList -var _ msgp.Encodable = (*anyValue)(nil) -var _ msgp.Encodable = (*spanListV1)(nil) -var _ msgp.Encodable = (*keyValue)(nil) -var _ msgp.Encodable = (keyValueList)(nil) -var _ msgp.Encodable = (*traceChunk)(nil) - // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { return &payloadV1{ @@ -169,7 +172,12 @@ func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { spans: t, traceID: t[0].Context().traceID, }) - if err := msgp.Encode(&p.buf, t); err != nil { + wr := msgp.NewWriter(&p.buf) + err = t.EncodeMsg(wr, p) + if err == nil { + err = wr.Flush() + } + if err != nil { return payloadStats{}, err } p.recordItem() @@ -249,16 +257,18 @@ func (p *payloadV1) Read(b []byte) (n int, err error) { } // Encode the anyValue -// EncodeMsg implements msgp.Encodable. -func (a *anyValue) EncodeMsg(e *msgp.Writer) error { +func (a *anyValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { switch a.valueType { case StringValueType: e.WriteInt32(StringValueType) - v, err := encodeString(a.value.(string)) + v, err := p.encodeString(a.value.(string)) if err != nil { - return e.WriteString(a.value.(string)) + return err } - return e.WriteUint32(v) + if v.isString { + return e.WriteString(v.stringValue) + } + return e.WriteUint32(v.idx) case BoolValueType: e.WriteInt32(BoolValueType) return e.WriteBool(a.value.(bool)) @@ -273,43 +283,46 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer) error { return e.WriteBytes(a.value.([]byte)) case ArrayValueType: e.WriteInt32(ArrayValueType) - return a.value.(arrayValue).EncodeMsg(e) + return a.value.(arrayValue).EncodeMsg(e, p) case keyValueListType: e.WriteInt32(keyValueListType) - return a.value.(keyValueList).EncodeMsg(e) + return a.value.(keyValueList).EncodeMsg(e, p) default: return fmt.Errorf("invalid value type: %d", a.valueType) } } -// EncodeMsg implements msgp.Encodable. -func (av arrayValue) EncodeMsg(e *msgp.Writer) error { +func (av arrayValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { err := e.WriteArrayHeader(uint32(len(av))) if err != nil { return err } for _, value := range av { - if err := value.EncodeMsg(e); err != nil { + if err := value.EncodeMsg(e, p); err != nil { return err } } return nil } -// EncodeMsg implements msgp.Encodable. -func (k keyValue) EncodeMsg(e *msgp.Writer) error { - err := e.WriteUint32(k.key) +func (k keyValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { + var err error + if k.key.isString { + err = e.WriteString(k.key.stringValue) + } else { + err = e.WriteUint32(k.key.idx) + } if err != nil { return err } - err = k.value.EncodeMsg(e) + err = k.value.EncodeMsg(e, p) if err != nil { return err } return nil } -func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { +func (kv keyValueList) EncodeMsg(e *msgp.Writer, p *payloadV1) error { err := e.WriteMapHeader(uint32(len(kv))) if err != nil { return err @@ -319,7 +332,7 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { if err != nil { return err } - err = k.EncodeMsg(e) + err = k.EncodeMsg(e, p) if err != nil { return err } @@ -327,31 +340,30 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer) error { return nil } -func (t *traceChunk) EncodeMsg(e *msgp.Writer) error { +func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { kv := keyValueList{ - {key: 1, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority - {key: 2, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin - {key: 4, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans - {key: 5, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace - {key: 6, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID - {key: 7, value: anyValue{valueType: IntValueType, value: t.samplingMechanism}}, // samplingMechanism + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace + {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID + {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: t.samplingMechanism}}, // samplingMechanism } attr := keyValueList{} for k, v := range t.attributes { - attr = append(attr, keyValue{key: k, value: anyValue{valueType: getAnyValueType(v), value: v}}) + attr = append(attr, keyValue{key: streamingKey{isString: false, idx: k}, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - kv = append(kv, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes - return kv.EncodeMsg(e) + return kv.EncodeMsg(e, p) } // EncodeMsg writes the contents of a list of spans into `p.buf` // Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. // For v1 we need to manually encode the spans, span links, and span events // if we don't want to do extra allocations. -// EncodeMsg implements msgp.Encodable. -func (s spanListV1) EncodeMsg(e *msgp.Writer) error { +func (s spanListV1) EncodeMsg(e *msgp.Writer, p *payloadV1) error { err := e.WriteArrayHeader(uint32(len(s))) if err != nil { return msgp.WrapError(err) @@ -365,7 +377,7 @@ func (s spanListV1) EncodeMsg(e *msgp.Writer) error { return err } } else { - err := encodeSpan(span, e) + err := encodeSpan(span, e, p) if err != nil { return msgp.WrapError(err, span) } @@ -377,62 +389,62 @@ func (s spanListV1) EncodeMsg(e *msgp.Writer) error { // Custom encoding for spans under the v1 trace protocol. // The encoding of attributes is the combination of the meta, metrics, and metaStruct fields of the v0.4 protocol. -func encodeSpan(s *Span, e *msgp.Writer) error { +func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { kv := keyValueList{ - {key: 1, value: anyValue{valueType: StringValueType, value: s.service}}, // service - {key: 2, value: anyValue{valueType: StringValueType, value: s.name}}, // name - {key: 3, value: anyValue{valueType: StringValueType, value: s.resource}}, // resource - {key: 4, value: anyValue{valueType: IntValueType, value: s.spanID}}, // spanID - {key: 5, value: anyValue{valueType: IntValueType, value: s.parentID}}, // parentID - {key: 6, value: anyValue{valueType: IntValueType, value: s.start}}, // start - {key: 7, value: anyValue{valueType: IntValueType, value: s.duration}}, // duration - {key: 8, value: anyValue{valueType: BoolValueType, value: s.error}}, // error - {key: 10, value: anyValue{valueType: StringValueType, value: s.spanType}}, // type - {key: 11, value: anyValue{valueType: keyValueListType, value: s.spanLinks}}, // SpanLink - {key: 12, value: anyValue{valueType: keyValueListType, value: s.spanEvents}}, // SpanEvent - {key: 15, value: anyValue{valueType: StringValueType, value: s.integration}}, // component + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: StringValueType, value: s.service}}, // service + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.name}}, // name + {key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: StringValueType, value: s.resource}}, // resource + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: IntValueType, value: s.spanID}}, // spanID + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: s.parentID}}, // parentID + {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: IntValueType, value: s.start}}, // start + {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: s.duration}}, // duration + {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: BoolValueType, value: s.error}}, // error + {key: streamingKey{isString: false, idx: 10}, value: anyValue{valueType: StringValueType, value: s.spanType}}, // type + {key: streamingKey{isString: false, idx: 11}, value: anyValue{valueType: keyValueListType, value: s.spanLinks}}, // SpanLink + {key: streamingKey{isString: false, idx: 12}, value: anyValue{valueType: keyValueListType, value: s.spanEvents}}, // SpanEvent + {key: streamingKey{isString: false, idx: 15}, value: anyValue{valueType: StringValueType, value: s.integration}}, // component } // encode meta attributes attr := keyValueList{} for k, v := range s.meta { - idx, err := encodeString(k) + idx, err := p.encodeString(k) if err != nil { - idx = k + idx = streamingKey{isString: true, stringValue: k} } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: StringValueType, value: v}}) } // encode metric attributes for k, v := range s.metrics { - idx, err := encodeString(k) + idx, err := p.encodeString(k) if err != nil { - idx = k + idx = streamingKey{isString: true, stringValue: k} } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: FloatValueType, value: v}}) } // encode metaStruct attributes for k, v := range s.metaStruct { - idx, err := encodeString(k) + idx, err := p.encodeString(k) if err != nil { - idx = k + idx = streamingKey{isString: true, stringValue: k} } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - kv = append(kv, keyValue{key: 9, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes env, ok := s.meta["env"] if ok { - kv = append(kv, keyValue{key: 13, value: anyValue{valueType: StringValueType, value: env}}) // env + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 13}, value: anyValue{valueType: StringValueType, value: env}}) // env } version, ok := s.meta["version"] if ok { - kv = append(kv, keyValue{key: 14, value: anyValue{valueType: StringValueType, value: version}}) // version + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 14}, value: anyValue{valueType: StringValueType, value: version}}) // version } - return kv.EncodeMsg(e) + return kv.EncodeMsg(e, p) } // encodeString and decodeString handles encoding a string to the payload's string table. @@ -440,12 +452,12 @@ func encodeSpan(s *Span, e *msgp.Writer) error { // - use its index in the string table if it exists // - otherwise, write the string into the message, then add the string at the next index // Returns the index of the string in the string table, and an error if there is one -func (p *payloadV1) encodeString(s string) (uint32, error) { +func (p *payloadV1) encodeString(s string) (streamingKey, error) { sTable := &p.strings idx, ok := sTable.indices[s] // if the string already exists in the table, use its index if ok { - return idx, nil + return streamingKey{isString: false, idx: idx}, nil } // else, write the string into the table at the next index @@ -453,12 +465,12 @@ func (p *payloadV1) encodeString(s string) (uint32, error) { sTable.indices[s] = sTable.nextIndex sTable.strings = append(sTable.strings, s) sTable.nextIndex += 1 - return sTable.nextIndex, fmt.Errorf("string not found in table") + return streamingKey{isString: true, stringValue: s}, nil } // encodeSpanLinks encodes the span links into a msgp.Writer // Span links are represented as an array of fixmaps (keyValueList) -func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { +func encodeSpanLinks(sl []SpanLink, e *msgp.Writer, p *payloadV1) error { // write the number of span links err := e.WriteArrayHeader(uint32(len(sl))) if err != nil { @@ -469,27 +481,27 @@ func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { kv := arrayValue{} for _, s := range sl { slKeyValues := keyValueList{ - {key: 1, value: anyValue{valueType: IntValueType, value: s.TraceID}}, // traceID - {key: 2, value: anyValue{valueType: IntValueType, value: s.SpanID}}, // spanID - {key: 4, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate - {key: 5, value: anyValue{valueType: IntValueType, value: s.Flags}}, // flags + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: s.TraceID}}, // traceID + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: IntValueType, value: s.SpanID}}, // spanID + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: s.Flags}}, // flags } attr := keyValueList{} // attributes for k, v := range s.Attributes { - idx, err := encodeString(k) + idx, err := p.encodeString(k) if err != nil { - idx = k + idx = streamingKey{isString: true, stringValue: k} } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - slKeyValues = append(slKeyValues, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + slKeyValues = append(slKeyValues, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) } for _, v := range kv { - err := v.EncodeMsg(e) + err := v.EncodeMsg(e, p) if err != nil { return err } @@ -499,7 +511,7 @@ func encodeSpanLinks(sl []SpanLink, e *msgp.Writer) error { // encodeSpanEvents encodes the span events into a msgp.Writer // Span events are represented as an array of fixmaps (keyValueList) -func encodeSpanEvents(se []spanEvent, e *msgp.Writer) error { +func encodeSpanEvents(se []spanEvent, e *msgp.Writer, p *payloadV1) error { // write the number of span events err := e.WriteArrayHeader(uint32(len(se))) if err != nil { @@ -510,25 +522,25 @@ func encodeSpanEvents(se []spanEvent, e *msgp.Writer) error { kv := arrayValue{} for _, s := range se { slKeyValues := keyValueList{ - {key: 1, value: anyValue{valueType: IntValueType, value: s.TimeUnixNano}}, // time - {key: 2, value: anyValue{valueType: StringValueType, value: s.Name}}, // name + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: s.TimeUnixNano}}, // time + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.Name}}, // name } attr := keyValueList{} // attributes for k, v := range s.Attributes { - idx, err := encodeString(k) + idx, err := p.encodeString(k) if err != nil { - idx = k + idx = streamingKey{isString: true, stringValue: k} } attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - slKeyValues = append(slKeyValues, keyValue{key: 3, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + slKeyValues = append(slKeyValues, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes kv = append(kv, anyValue{valueType: keyValueListType, value: slKeyValues}) } for _, v := range kv { - err := v.EncodeMsg(e) + err := v.EncodeMsg(e, p) if err != nil { return err } From 03abab3e868f0cdcd5adcd88311d2aa54976e6da Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Tue, 23 Sep 2025 16:16:48 -0400 Subject: [PATCH 16/22] fix some types, also i don't think we need spanlistv1 anymore --- ddtrace/tracer/payload.go | 2 +- ddtrace/tracer/payload_v1.go | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 7fd8c402b5..6fcca38d95 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -166,7 +166,7 @@ type traceChunk struct { attributes map[uint32]anyValue `msg:"attributes,omitempty"` // a list of spans in this chunk - spans spanListV1 `msg:"spans,omitempty"` + spans spanList `msg:"spans,omitempty"` // whether the trace only contains analyzed spans // (not required by tracers and set by the agent) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 894afb4fd0..d71c582b2e 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -94,7 +94,7 @@ type stringTable struct { // - uint32 // - bool // - float64 -// - uint64 +// - int64 // - uint8 // intValue(5) - 0x405 (4 indicates this is an int AnyType, then 5 is encoded using positive fixed int format) // stringValue(“a”) - 0x1a161 (1 indicates this is a string, then “a” is encoded using fixstr 0xa161) @@ -108,7 +108,7 @@ const ( StringValueType = iota + 1 // string or uint -- 1 BoolValueType // boolean -- 2 FloatValueType // float64 -- 3 - IntValueType // uint64 -- 4 + IntValueType // int64 -- 4 BytesValueType // []uint8 -- 5 ArrayValueType // []AnyValue -- 6 keyValueListType // []keyValue -- 7 @@ -133,8 +133,6 @@ type keyValue struct { type keyValueList []keyValue -type spanListV1 spanList - // newPayloadV1 returns a ready to use payloadV1. func newPayloadV1() *payloadV1 { return &payloadV1{ @@ -150,7 +148,7 @@ func newPayloadV1() *payloadV1 { } // push pushes a new item into the stream. -func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { +func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. // Conceptually, our `t []*Span` corresponds to one `traceChunk`. origin, priority := "", 0 @@ -173,7 +171,7 @@ func (p *payloadV1) push(t spanListV1) (stats payloadStats, err error) { traceID: t[0].Context().traceID, }) wr := msgp.NewWriter(&p.buf) - err = t.EncodeMsg(wr, p) + err = EncodeSpanList(t, wr, p) if err == nil { err = wr.Flush() } @@ -277,7 +275,7 @@ func (a *anyValue) EncodeMsg(e *msgp.Writer, p *payloadV1) error { return e.WriteFloat64(a.value.(float64)) case IntValueType: e.WriteInt32(IntValueType) - return e.WriteUint64(a.value.(uint64)) + return e.WriteInt64(a.value.(int64)) case BytesValueType: e.WriteInt32(BytesValueType) return e.WriteBytes(a.value.([]byte)) @@ -363,7 +361,7 @@ func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { // Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. // For v1 we need to manually encode the spans, span links, and span events // if we don't want to do extra allocations. -func (s spanListV1) EncodeMsg(e *msgp.Writer, p *payloadV1) error { +func EncodeSpanList(s spanList, e *msgp.Writer, p *payloadV1) error { err := e.WriteArrayHeader(uint32(len(s))) if err != nil { return msgp.WrapError(err) From 22e70ec00cd2d67878ed8c5c36d64a4dcacee1e0 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Wed, 24 Sep 2025 13:04:26 -0400 Subject: [PATCH 17/22] fix immediate compiler issues --- ddtrace/tracer/payload_v1.go | 85 ++++++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index d71c582b2e..f59c516c66 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -144,6 +144,8 @@ func newPayloadV1() *payloadV1 { indices: map[string]uint32{"": 0}, nextIndex: 1, }, + header: make([]byte, 8), + off: 8, } } @@ -251,7 +253,16 @@ func (p *payloadV1) Write(data []byte) (n int, err error) { // Read implements io.Reader. It reads from the msgpack-encoded stream. func (p *payloadV1) Read(b []byte) (n int, err error) { - panic("not implemented") + if p.off < len(p.header) { + // reading header + n = copy(b, p.header[p.off:]) + p.off += n + return n, nil + } + if p.reader == nil { + p.reader = bytes.NewReader(p.buf.Bytes()) + } + return p.reader.Read(b) } // Encode the anyValue @@ -325,11 +336,7 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer, p *payloadV1) error { if err != nil { return err } - for i, k := range kv { - err := e.WriteUint32(uint32(i)) - if err != nil { - return err - } + for _, k := range kv { err = k.EncodeMsg(e, p) if err != nil { return err @@ -340,12 +347,12 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer, p *payloadV1) error { func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { kv := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: t.priority}}, // priority - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace - {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID - {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: t.samplingMechanism}}, // samplingMechanism + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(t.priority)}}, // priority + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace + {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID + {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: StringValueType, value: t.samplingMechanism}}, // samplingMechanism } attr := keyValueList{} @@ -392,14 +399,12 @@ func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: StringValueType, value: s.service}}, // service {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.name}}, // name {key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: StringValueType, value: s.resource}}, // resource - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: IntValueType, value: s.spanID}}, // spanID - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: s.parentID}}, // parentID - {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: IntValueType, value: s.start}}, // start - {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: s.duration}}, // duration - {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: BoolValueType, value: s.error}}, // error + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: IntValueType, value: int64(s.spanID)}}, // spanID + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: int64(s.parentID)}}, // parentID + {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: IntValueType, value: int64(s.start)}}, // start + {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: IntValueType, value: int64(s.duration)}}, // duration + {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: BoolValueType, value: (s.error != 0)}}, // error - true if span has error {key: streamingKey{isString: false, idx: 10}, value: anyValue{valueType: StringValueType, value: s.spanType}}, // type - {key: streamingKey{isString: false, idx: 11}, value: anyValue{valueType: keyValueListType, value: s.spanLinks}}, // SpanLink - {key: streamingKey{isString: false, idx: 12}, value: anyValue{valueType: keyValueListType, value: s.spanEvents}}, // SpanEvent {key: streamingKey{isString: false, idx: 15}, value: anyValue{valueType: StringValueType, value: s.integration}}, // component } @@ -431,7 +436,7 @@ func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { attr = append(attr, keyValue{key: idx, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: keyValueListType, value: attr}}) // attributes env, ok := s.meta["env"] if ok { @@ -442,7 +447,19 @@ func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 14}, value: anyValue{valueType: StringValueType, value: version}}) // version } - return kv.EncodeMsg(e, p) + err := kv.EncodeMsg(e, p) + if err != nil { + return err + } + + // spanLinks + err = encodeSpanLinks(s.spanLinks, e, p) + if err != nil { + return err + } + + // spanEvents + return encodeSpanEvents(s.spanEvents, e, p) } // encodeString and decodeString handles encoding a string to the payload's string table. @@ -469,8 +486,13 @@ func (p *payloadV1) encodeString(s string) (streamingKey, error) { // encodeSpanLinks encodes the span links into a msgp.Writer // Span links are represented as an array of fixmaps (keyValueList) func encodeSpanLinks(sl []SpanLink, e *msgp.Writer, p *payloadV1) error { + err := e.WriteInt32(11) // spanLinks + if err != nil { + return err + } + // write the number of span links - err := e.WriteArrayHeader(uint32(len(sl))) + err = e.WriteArrayHeader(uint32(len(sl))) if err != nil { return err } @@ -479,10 +501,10 @@ func encodeSpanLinks(sl []SpanLink, e *msgp.Writer, p *payloadV1) error { kv := arrayValue{} for _, s := range sl { slKeyValues := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: s.TraceID}}, // traceID - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: IntValueType, value: s.SpanID}}, // spanID - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate - {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: s.Flags}}, // flags + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(s.TraceID)}}, // traceID + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: IntValueType, value: int64(s.SpanID)}}, // spanID + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: s.Tracestate}}, // tracestate + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: IntValueType, value: int64(s.Flags)}}, // flags } attr := keyValueList{} @@ -510,8 +532,13 @@ func encodeSpanLinks(sl []SpanLink, e *msgp.Writer, p *payloadV1) error { // encodeSpanEvents encodes the span events into a msgp.Writer // Span events are represented as an array of fixmaps (keyValueList) func encodeSpanEvents(se []spanEvent, e *msgp.Writer, p *payloadV1) error { + err := e.WriteInt32(12) // spanEvents + if err != nil { + return err + } + // write the number of span events - err := e.WriteArrayHeader(uint32(len(se))) + err = e.WriteArrayHeader(uint32(len(se))) if err != nil { return err } @@ -520,8 +547,8 @@ func encodeSpanEvents(se []spanEvent, e *msgp.Writer, p *payloadV1) error { kv := arrayValue{} for _, s := range se { slKeyValues := keyValueList{ - {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: s.TimeUnixNano}}, // time - {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.Name}}, // name + {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(s.TimeUnixNano)}}, // time + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: s.Name}}, // name } attr := keyValueList{} From 68ab0d84a8432e26b32b507ec72540a22319d24d Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Wed, 24 Sep 2025 14:40:53 -0400 Subject: [PATCH 18/22] few more fixes with payload representations --- ddtrace/tracer/payload.go | 2 +- ddtrace/tracer/payload_v1.go | 48 ++++++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 6fcca38d95..2532a53943 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -163,7 +163,7 @@ type traceChunk struct { origin string `msg:"origin,omitempty"` // a collection of key to value pairs common in all `spans` - attributes map[uint32]anyValue `msg:"attributes,omitempty"` + attributes keyValueList `msg:"attributes,omitempty"` // a list of spans in this chunk spans spanList `msg:"spans,omitempty"` diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index f59c516c66..610d543864 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -34,31 +34,31 @@ type payloadV1 struct { strings stringTable `msgp:"strings"` // the string ID of the container where the tracer is running - containerID uint32 `msgp:"containerID"` + containerID string `msgp:"containerID"` // the string language name of the tracer - languageName uint32 `msgp:"languageName"` + languageName string `msgp:"languageName"` // the string language version of the tracer - languageVersion uint32 `msgp:"languageVersion"` + languageVersion string `msgp:"languageVersion"` // the string version of the tracer - tracerVersion uint32 `msgp:"tracerVersion"` + tracerVersion string `msgp:"tracerVersion"` // the V4 string UUID representation of a tracer session - runtimeID uint32 `msgp:"runtimeID"` + runtimeID string `msgp:"runtimeID"` // the optional `env` string tag that set with the tracer - env uint32 `msgp:"env,omitempty"` + env string `msgp:"env,omitempty"` // the optional string hostname of where the tracer is running - hostname uint32 `msgp:"hostname,omitempty"` + hostname string `msgp:"hostname,omitempty"` // the optional string `version` tag for the application set in the tracer - appVersion uint32 `msgp:"appVersion,omitempty"` + appVersion string `msgp:"appVersion,omitempty"` // a collection of key to value pairs common in all `chunks` - attributes map[uint32]anyValue `msgp:"attributes,omitempty"` + attributes keyValueList `msgp:"attributes,omitempty"` // a list of trace `chunks` chunks []traceChunk `msgp:"chunks,omitempty"` @@ -137,7 +137,7 @@ type keyValueList []keyValue func newPayloadV1() *payloadV1 { return &payloadV1{ protocolVersion: traceProtocolV1, - attributes: make(map[uint32]anyValue), + attributes: keyValueList{}, chunks: make([]traceChunk, 0), strings: stringTable{ strings: []string{""}, @@ -165,23 +165,39 @@ func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { } } + kv := keyValueList{ + {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: p.containerID}}, // containerID + {key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: StringValueType, value: p.languageName}}, // languageName + {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: StringValueType, value: p.languageVersion}}, // languageVersion + {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: StringValueType, value: p.tracerVersion}}, // tracerVersion + {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: StringValueType, value: p.runtimeID}}, // runtimeID + {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: StringValueType, value: p.env}}, // env + {key: streamingKey{isString: false, idx: 8}, value: anyValue{valueType: StringValueType, value: p.hostname}}, // hostname + {key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: StringValueType, value: p.appVersion}}, // appVersion + } + p.chunks = append(p.chunks, traceChunk{ priority: int32(priority), origin: origin, - attributes: make(map[uint32]anyValue), + attributes: keyValueList{}, spans: t, traceID: t[0].Context().traceID, }) wr := msgp.NewWriter(&p.buf) err = EncodeSpanList(t, wr, p) - if err == nil { - err = wr.Flush() - } if err != nil { return payloadStats{}, err } + + // once we've encoded the spans, we can encode the attributes + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 10}, value: anyValue{valueType: keyValueListType, value: p.attributes}}) // attributes + err = kv.EncodeMsg(wr, p) + if err == nil { + err = wr.Flush() + } + p.recordItem() - return p.stats(), nil + return p.stats(), err } func (p *payloadV1) grow(n int) { @@ -357,7 +373,7 @@ func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { attr := keyValueList{} for k, v := range t.attributes { - attr = append(attr, keyValue{key: streamingKey{isString: false, idx: k}, value: anyValue{valueType: getAnyValueType(v), value: v}}) + attr = append(attr, keyValue{key: streamingKey{isString: false, idx: uint32(k)}, value: anyValue{valueType: getAnyValueType(v), value: v}}) } kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes From 23c72cfc80c29aca3afd4e6ac1936e37fac76053 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Mon, 29 Sep 2025 13:36:26 -0400 Subject: [PATCH 19/22] wip: decoding functions --- ddtrace/tracer/payload_v1.go | 548 ++++++++++++++++++++++++++++++++++- 1 file changed, 536 insertions(+), 12 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 610d543864..8b82fa7c03 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -11,6 +11,7 @@ import ( "fmt" "sync/atomic" + "github.com/DataDog/dd-trace-go/v2/ddtrace/ext" "github.com/tinylib/msgp/msgp" ) @@ -31,7 +32,7 @@ type payloadV1 struct { // array of strings referenced in this tracer payload, its chunks and spans // stringTable holds references from a string value to an index. // the 0th position in the stringTable should always be the empty string. - strings stringTable `msgp:"strings"` + strings *stringTable `msgp:"strings"` // the string ID of the container where the tracer is running containerID string `msgp:"containerID"` @@ -139,16 +140,29 @@ func newPayloadV1() *payloadV1 { protocolVersion: traceProtocolV1, attributes: keyValueList{}, chunks: make([]traceChunk, 0), - strings: stringTable{ - strings: []string{""}, - indices: map[string]uint32{"": 0}, - nextIndex: 1, - }, - header: make([]byte, 8), - off: 8, + strings: newStringTable(), + header: make([]byte, 8), + off: 8, } } +func newStringTable() *stringTable { + return &stringTable{ + strings: []string{""}, + indices: map[string]uint32{"": 0}, + nextIndex: 1, + } +} + +func (s *stringTable) Add(str string) { + if _, ok := s.indices[str]; ok { + return + } + s.indices[str] = s.nextIndex + s.strings = append(s.strings, str) + s.nextIndex += 1 +} + // push pushes a new item into the stream. func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { // We need to hydrate the payload with everything we get from the spans. @@ -484,7 +498,7 @@ func encodeSpan(s *Span, e *msgp.Writer, p *payloadV1) error { // - otherwise, write the string into the message, then add the string at the next index // Returns the index of the string in the string table, and an error if there is one func (p *payloadV1) encodeString(s string) (streamingKey, error) { - sTable := &p.strings + sTable := p.strings idx, ok := sTable.indices[s] // if the string already exists in the table, use its index if ok { @@ -493,9 +507,7 @@ func (p *payloadV1) encodeString(s string) (streamingKey, error) { // else, write the string into the table at the next index // return an error to indicate that the string should be written to the msgp message - sTable.indices[s] = sTable.nextIndex - sTable.strings = append(sTable.strings, s) - sTable.nextIndex += 1 + sTable.Add(s) return streamingKey{isString: true, stringValue: s}, nil } @@ -604,3 +616,515 @@ func getAnyValueType(v any) int { } return IntValueType } + +func (p *payloadV1) Decode(b []byte) ([]byte, error) { + if p.strings == nil { + p.strings = newStringTable() + } + + fields, o, err := msgp.ReadMapHeaderBytes(b) + if err != nil { + return o, err + } + + for fields > 0 { + fields-- + + f, o, err := msgp.ReadUint32Bytes(b) + if err != nil { + return o, err + } + + switch f { + case 1: // stringTable + o, err = DecodeStringTable(o, p.strings) + if err != nil { + return o, err + } + + case 2: // containerID + p.containerID, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 3: // languageName + p.languageName, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 4: // languageVersion + p.languageVersion, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 5: // tracerVersion + p.tracerVersion, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 6: // runtimeID + p.runtimeID, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 7: // env + p.env, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 8: // hostname + p.hostname, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + + case 9: // appVersion + p.appVersion, o, err = DecodeStreamingString(o, p.strings) + if err != nil { + return o, err + } + case 10: // attributes + p.attributes, o, err = DecodeKeyValueList(o, p.strings) + if err != nil { + return o, err + } + case 11: // chunks + p.chunks, o, err = DecodeTraceChunks(o, p.strings) + if err != nil { + return o, err + } + } + } + return o, nil +} + +func DecodeStringTable(b []byte, strings *stringTable) ([]byte, error) { + len, o, err := msgp.ReadBytesHeader(b) + if err != nil { + return nil, err + } + + for len > 0 { + len-- + str, o, err := msgp.ReadStringBytes(o) + if err != nil { + return o, err + } + + // if we've seen the string before, skip + if _, ok := strings.indices[str]; ok { + continue + } + + strings.Add(str) + } + return o, nil +} + +func DecodeStreamingString(b []byte, strings *stringTable) (string, []byte, error) { + if len(b) == 0 { + return "", nil, msgp.WrapError(nil, "expected streaming string, got EOF") + } + // try reading as a uint32 index + idx, o, err := msgp.ReadUint32Bytes(b) + if err == nil { + return strings.strings[idx], o, nil + } + + // else, try reading as a string, then add to the string table + str, o, err := msgp.ReadStringBytes(b) + if err != nil { + return "", nil, msgp.WrapError(err, "unable to read streaming string") + } + strings.Add(str) + return str, o, nil +} + +func DecodeAnyValue(b []byte, strings *stringTable) (anyValue, []byte, error) { + vType, o, err := msgp.ReadInt32Bytes(b) + if err != nil { + return anyValue{}, o, err + } + switch vType { + case StringValueType: + str, o, err := msgp.ReadStringBytes(o) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: StringValueType, value: str}, o, nil + case BoolValueType: + b, o, err := msgp.ReadBoolBytes(o) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: BoolValueType, value: b}, o, nil + case FloatValueType: + f, o, err := msgp.ReadFloat64Bytes(o) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: FloatValueType, value: f}, o, nil + case IntValueType: + i, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: IntValueType, value: i}, o, nil + case BytesValueType: + b, o, err := msgp.ReadBytesBytes(o, nil) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: BytesValueType, value: b}, o, nil + case ArrayValueType: + len, o, err := msgp.ReadBytesHeader(o) + if err != nil { + return anyValue{}, o, err + } + arrayValue := make(arrayValue, len/2) + for i := range len / 2 { + arrayValue[i], o, err = DecodeAnyValue(o, strings) + if err != nil { + return anyValue{}, o, err + } + } + return anyValue{valueType: ArrayValueType, value: arrayValue}, o, nil + case keyValueListType: + kv, o, err := DecodeKeyValueList(o, strings) + if err != nil { + return anyValue{}, o, err + } + return anyValue{valueType: keyValueListType, value: kv}, o, nil + default: + return anyValue{}, o, fmt.Errorf("invalid value type: %d", vType) + } +} + +func DecodeKeyValueList(b []byte, strings *stringTable) (keyValueList, []byte, error) { + len, o, err := msgp.ReadBytesHeader(b) + if err != nil { + return nil, o, err + } + + if len == 0 || len%3 != 0 { + return nil, o, msgp.WrapError(fmt.Errorf("invalid number of items in keyValueList encoding, expected multiple of 3, got %d", len)) + } + + kv := make(keyValueList, len/3) + for i := range len / 3 { + len-- + key, o, err := DecodeStreamingString(o, strings) + if err != nil { + return nil, o, err + } + v, o, err := DecodeAnyValue(o, strings) + if err != nil { + return nil, o, err + } + kv[i] = keyValue{key: streamingKey{isString: true, stringValue: key}, value: anyValue{valueType: v.valueType, value: v.value}} + } + return kv, o, nil +} + +func DecodeTraceChunks(b []byte, strings *stringTable) ([]traceChunk, []byte, error) { + tc := []traceChunk{} + fields, o, err := msgp.ReadMapHeaderBytes(b) + if err != nil { + return tc, o, err + } + + for fields > 0 { + fields-- + + f, o, err := msgp.ReadUint32Bytes(b) + if err != nil { + return tc, o, err + } + + switch f { + case 1: // priority + + case 2: // origin + + case 3: // attributes + + case 4: // spans + + case 5: // droppedTrace + + case 6: // traceID + + case 7: // samplingMechanism + } + } + return tc, o, nil +} + +func DecodeSpan(b []byte, strings *stringTable) (*Span, []byte, error) { + sp := Span{} + fields, o, err := msgp.ReadMapHeaderBytes(b) + if err != nil { + return &sp, o, err + } + + for fields > 0 { + fields-- + + f, o, err := msgp.ReadUint32Bytes(b) + if err != nil { + return &sp, o, err + } + + switch f { + case 1: // service + st, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.service = st + case 2: // name + st, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.name = st + case 3: // resource + st, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.resource = st + case 4: // spanID + i, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return &sp, o, err + } + sp.spanID = uint64(i) + case 5: // parentID + i, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return &sp, o, err + } + sp.parentID = uint64(i) + case 6: // start + i, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return &sp, o, err + } + sp.start = i + case 7: // duration + i, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return &sp, o, err + } + sp.duration = i + case 8: // error + i, o, err := msgp.ReadBoolBytes(o) + if err != nil { + return &sp, o, err + } + if i { + sp.error = 1 + } else { + sp.error = 0 + } + case 9: // attributes + kv, o, err := DecodeKeyValueList(o, strings) + if err != nil { + return &sp, o, err + } + for k, v := range kv { + key := strings.strings[k] + sp.SetTag(key, v.value.value) + } + case 10: // type + st, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.spanType = st + case 11: // spanLinks + sl, o, err := DecodeSpanLinks(o, strings) + if err != nil { + return &sp, o, err + } + sp.spanLinks = sl + case 12: // spanEvents + se, o, err := DecodeSpanEvents(o, strings) + if err != nil { + return &sp, o, err + } + sp.spanEvents = se + case 13: // env + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.SetTag(ext.Environment, s) + case 14: // version + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.setMeta(ext.Version, s) + case 15: // component + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return &sp, o, err + } + sp.integration = s + } + } + return &sp, nil, nil +} + +func DecodeSpanLinks(b []byte, strings *stringTable) ([]SpanLink, []byte, error) { + numSpanLinks, o, err := msgp.ReadArrayHeaderBytes(b) + if err != nil { + return nil, o, err + } + + ret := make([]SpanLink, numSpanLinks) + for i := range numSpanLinks { + sl := SpanLink{} + fields, o, err := msgp.ReadMapHeaderBytes(o) + if err != nil { + return ret, o, err + } + for fields > 0 { + fields-- + + f, o, err := msgp.ReadUint32Bytes(o) + if err != nil { + return ret, o, err + } + + switch f { + case 1: // traceID + s, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return ret, o, err + } + sl.TraceID = uint64(s) + case 2: // spanID + s, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return ret, o, err + } + sl.SpanID = uint64(s) + case 3: // attributes + kv, o, err := DecodeKeyValueList(o, strings) + if err != nil { + return ret, o, err + } + for k, v := range kv { + key := strings.strings[k] + s, ok := v.value.value.(string) + if !ok { + err := msgp.WrapError(fmt.Errorf("expected string value type for span link attributes, got %T", v.value.value)) + return ret, o, err + } + sl.Attributes[key] = s + } + case 4: // tracestate + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return ret, o, err + } + sl.Tracestate = s + case 5: // flags + s, o, err := msgp.ReadInt32Bytes(o) + if err != nil { + return ret, o, err + } + sl.Flags = uint32(s) + } + } + ret[i] = sl + } + return ret, o, nil +} + +func DecodeSpanEvents(b []byte, strings *stringTable) ([]spanEvent, []byte, error) { + numSpanEvents, o, err := msgp.ReadArrayHeaderBytes(b) + if err != nil { + return nil, o, err + } + ret := make([]spanEvent, numSpanEvents) + for i := range numSpanEvents { + se := spanEvent{} + fields, o, err := msgp.ReadMapHeaderBytes(o) + if err != nil { + return ret, o, err + } + for fields > 0 { + fields-- + + f, o, err := msgp.ReadUint32Bytes(b) + if err != nil { + return ret, o, err + } + + switch f { + case 1: // time + s, o, err := msgp.ReadInt64Bytes(o) + if err != nil { + return ret, o, err + } + se.TimeUnixNano = uint64(s) + case 2: // name + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return ret, o, err + } + se.Name = s + case 4: // attributes + kv, o, err := DecodeKeyValueList(o, strings) + if err != nil { + return ret, o, err + } + for k, v := range kv { + key := strings.strings[k] + switch v.value.valueType { + case StringValueType: + se.Attributes[key] = &spanEventAttribute{ + Type: spanEventAttributeTypeString, + StringValue: v.value.value.(string), + } + case BoolValueType: + se.Attributes[key] = &spanEventAttribute{ + Type: spanEventAttributeTypeBool, + BoolValue: v.value.value.(bool), + } + case IntValueType: + se.Attributes[key] = &spanEventAttribute{ + Type: spanEventAttributeTypeInt, + IntValue: v.value.value.(int64), + } + case FloatValueType: + se.Attributes[key] = &spanEventAttribute{ + Type: spanEventAttributeTypeDouble, + DoubleValue: v.value.value.(float64), + } + case ArrayValueType: + se.Attributes[key] = &spanEventAttribute{ + Type: spanEventAttributeTypeArray, + ArrayValue: v.value.value.(*spanEventArrayAttribute), + } + default: + err := msgp.WrapError(fmt.Errorf("unexpected value type not supported by span events: %T", v.value.value)) + return ret, o, err + } + } + } + } + ret[i] = se + } + return ret, nil, nil +} From a12502c88158fd1c6abc6ee2dd88d1c6ee2d5fcf Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Mon, 29 Sep 2025 14:54:05 -0400 Subject: [PATCH 20/22] wip: decode trace chunk func --- ddtrace/tracer/payload_v1.go | 94 ++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 21 deletions(-) diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 8b82fa7c03..84f13fa83f 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -833,37 +833,89 @@ func DecodeKeyValueList(b []byte, strings *stringTable) (keyValueList, []byte, e } func DecodeTraceChunks(b []byte, strings *stringTable) ([]traceChunk, []byte, error) { - tc := []traceChunk{} - fields, o, err := msgp.ReadMapHeaderBytes(b) + len, o, err := msgp.ReadArrayHeaderBytes(b) if err != nil { - return tc, o, err + return nil, o, err } - for fields > 0 { - fields-- - - f, o, err := msgp.ReadUint32Bytes(b) + ret := make([]traceChunk, len) + for i := range len { + fields, o, err := msgp.ReadMapHeaderBytes(o) if err != nil { - return tc, o, err + return nil, o, err } + tc := traceChunk{} + for fields > 0 { + fields-- - switch f { - case 1: // priority - - case 2: // origin - - case 3: // attributes - - case 4: // spans - - case 5: // droppedTrace + f, o, err := msgp.ReadUint32Bytes(b) + if err != nil { + return ret, o, err + } - case 6: // traceID + switch f { + case 1: // priority + s, o, err := msgp.ReadInt32Bytes(o) + if err != nil { + return ret, o, err + } + tc.priority = s + case 2: // origin + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return ret, o, err + } + tc.origin = s + case 3: // attributes + kv, o, err := DecodeKeyValueList(o, strings) + if err != nil { + return ret, o, err + } + tc.attributes = kv + case 4: // spans + s, o, err := DecodeSpanList(o, strings) + if err != nil { + return ret, o, err + } + tc.spans = s + case 5: // droppedTrace + s, o, err := msgp.ReadBoolBytes(o) + if err != nil { + return ret, o, err + } + tc.droppedTrace = s + case 6: // traceID + s, o, err := msgp.ReadBytesBytes(o, nil) + if err != nil { + return ret, o, err + } + tc.traceID = [16]byte(s) + case 7: // samplingMechanism + s, o, err := msgp.ReadStringBytes(o) + if err != nil { + return ret, o, err + } + tc.samplingMechanism = s + } + } + ret[i] = tc + } + return ret, o, nil +} - case 7: // samplingMechanism +func DecodeSpanList(b []byte, strings *stringTable) (spanList, []byte, error) { + len, o, err := msgp.ReadArrayHeaderBytes(b) + if err != nil { + return nil, o, err + } + ret := make([]*Span, len) + for i := range len { + ret[i], o, err = DecodeSpan(o, strings) + if err != nil { + return nil, o, err } } - return tc, o, nil + return ret, o, nil } func DecodeSpan(b []byte, strings *stringTable) (*Span, []byte, error) { From 08b1b9773704cedd81a59a74589576f95b2e0e30 Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Tue, 30 Sep 2025 13:13:52 -0400 Subject: [PATCH 21/22] couple fixes --- ddtrace/tracer/payload.go | 2 +- ddtrace/tracer/payload_v1.go | 62 ++++++++++++++++++------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 2532a53943..70be2dd865 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -173,7 +173,7 @@ type traceChunk struct { droppedTrace bool `msg:"droppedTrace"` // the ID of the trace to which all spans in this chunk belong - traceID [16]byte `msg:"traceID"` + traceID []byte `msg:"traceID"` // the optional string decision maker (previously span tag _dd.p.dm) samplingMechanism string `msg:"samplingMechanism,omitempty"` diff --git a/ddtrace/tracer/payload_v1.go b/ddtrace/tracer/payload_v1.go index 84f13fa83f..a8044a06e8 100644 --- a/ddtrace/tracer/payload_v1.go +++ b/ddtrace/tracer/payload_v1.go @@ -190,15 +190,17 @@ func (p *payloadV1) push(t spanList) (stats payloadStats, err error) { {key: streamingKey{isString: false, idx: 9}, value: anyValue{valueType: StringValueType, value: p.appVersion}}, // appVersion } - p.chunks = append(p.chunks, traceChunk{ + tc := traceChunk{ priority: int32(priority), origin: origin, attributes: keyValueList{}, spans: t, - traceID: t[0].Context().traceID, - }) + traceID: t[0].Context().traceID[:], + } + p.chunks = append(p.chunks, tc) wr := msgp.NewWriter(&p.buf) - err = EncodeSpanList(t, wr, p) + + err = tc.EncodeMsg(wr, p) if err != nil { return payloadStats{}, err } @@ -376,10 +378,11 @@ func (kv keyValueList) EncodeMsg(e *msgp.Writer, p *payloadV1) error { } func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { + e.WriteInt32(11) // write msgp index for `chunks` + kv := keyValueList{ {key: streamingKey{isString: false, idx: 1}, value: anyValue{valueType: IntValueType, value: int64(t.priority)}}, // priority {key: streamingKey{isString: false, idx: 2}, value: anyValue{valueType: StringValueType, value: t.origin}}, // origin - {key: streamingKey{isString: false, idx: 4}, value: anyValue{valueType: keyValueListType, value: t.spans}}, // spans {key: streamingKey{isString: false, idx: 5}, value: anyValue{valueType: BoolValueType, value: t.droppedTrace}}, // droppedTrace {key: streamingKey{isString: false, idx: 6}, value: anyValue{valueType: BytesValueType, value: t.traceID}}, // traceID {key: streamingKey{isString: false, idx: 7}, value: anyValue{valueType: StringValueType, value: t.samplingMechanism}}, // samplingMechanism @@ -389,22 +392,24 @@ func (t *traceChunk) EncodeMsg(e *msgp.Writer, p *payloadV1) error { for k, v := range t.attributes { attr = append(attr, keyValue{key: streamingKey{isString: false, idx: uint32(k)}, value: anyValue{valueType: getAnyValueType(v), value: v}}) } - kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: ArrayValueType, value: attr}}) // attributes + kv = append(kv, keyValue{key: streamingKey{isString: false, idx: 3}, value: anyValue{valueType: keyValueListType, value: attr}}) // attributes + + err := kv.EncodeMsg(e, p) + if err != nil { + return err + } - return kv.EncodeMsg(e, p) + return EncodeSpanList(t.spans, e, p) } -// EncodeMsg writes the contents of a list of spans into `p.buf` -// Span, SpanLink, and SpanEvent structs are different for v0.4 and v1.0. -// For v1 we need to manually encode the spans, span links, and span events -// if we don't want to do extra allocations. func EncodeSpanList(s spanList, e *msgp.Writer, p *payloadV1) error { + e.WriteInt32(4) // write msgp index for `spans` + err := e.WriteArrayHeader(uint32(len(s))) if err != nil { return msgp.WrapError(err) } - e.WriteInt32(4) for _, span := range s { if span == nil { err := e.WriteNil() @@ -622,7 +627,7 @@ func (p *payloadV1) Decode(b []byte) ([]byte, error) { p.strings = newStringTable() } - fields, o, err := msgp.ReadMapHeaderBytes(b) + fields, o, err := msgp.ReadArrayHeaderBytes(b) if err != nil { return o, err } @@ -630,18 +635,13 @@ func (p *payloadV1) Decode(b []byte) ([]byte, error) { for fields > 0 { fields-- - f, o, err := msgp.ReadUint32Bytes(b) + f, o, err := msgp.ReadInt32Bytes(o) if err != nil { return o, err } switch f { - case 1: // stringTable - o, err = DecodeStringTable(o, p.strings) - if err != nil { - return o, err - } - + // we don't care for the string table, so we don't decode it case 2: // containerID p.containerID, o, err = DecodeStreamingString(o, p.strings) if err != nil { @@ -738,7 +738,7 @@ func DecodeStreamingString(b []byte, strings *stringTable) (string, []byte, erro } // else, try reading as a string, then add to the string table - str, o, err := msgp.ReadStringBytes(b) + str, o, err := msgp.ReadStringBytes(o) if err != nil { return "", nil, msgp.WrapError(err, "unable to read streaming string") } @@ -783,7 +783,7 @@ func DecodeAnyValue(b []byte, strings *stringTable) (anyValue, []byte, error) { } return anyValue{valueType: BytesValueType, value: b}, o, nil case ArrayValueType: - len, o, err := msgp.ReadBytesHeader(o) + len, o, err := msgp.ReadArrayHeaderBytes(o) if err != nil { return anyValue{}, o, err } @@ -807,7 +807,7 @@ func DecodeAnyValue(b []byte, strings *stringTable) (anyValue, []byte, error) { } func DecodeKeyValueList(b []byte, strings *stringTable) (keyValueList, []byte, error) { - len, o, err := msgp.ReadBytesHeader(b) + len, o, err := msgp.ReadMapHeaderBytes(b) if err != nil { return nil, o, err } @@ -833,14 +833,14 @@ func DecodeKeyValueList(b []byte, strings *stringTable) (keyValueList, []byte, e } func DecodeTraceChunks(b []byte, strings *stringTable) ([]traceChunk, []byte, error) { - len, o, err := msgp.ReadArrayHeaderBytes(b) + len, o, err := msgp.ReadMapHeaderBytes(b) if err != nil { return nil, o, err } ret := make([]traceChunk, len) for i := range len { - fields, o, err := msgp.ReadMapHeaderBytes(o) + fields, o, err := msgp.ReadArrayHeaderBytes(o) if err != nil { return nil, o, err } @@ -848,7 +848,7 @@ func DecodeTraceChunks(b []byte, strings *stringTable) ([]traceChunk, []byte, er for fields > 0 { fields-- - f, o, err := msgp.ReadUint32Bytes(b) + f, o, err := msgp.ReadUint32Bytes(o) if err != nil { return ret, o, err } @@ -889,7 +889,7 @@ func DecodeTraceChunks(b []byte, strings *stringTable) ([]traceChunk, []byte, er if err != nil { return ret, o, err } - tc.traceID = [16]byte(s) + tc.traceID = []byte(s) case 7: // samplingMechanism s, o, err := msgp.ReadStringBytes(o) if err != nil { @@ -928,7 +928,7 @@ func DecodeSpan(b []byte, strings *stringTable) (*Span, []byte, error) { for fields > 0 { fields-- - f, o, err := msgp.ReadUint32Bytes(b) + f, o, err := msgp.ReadUint32Bytes(o) if err != nil { return &sp, o, err } @@ -1091,11 +1091,11 @@ func DecodeSpanLinks(b []byte, strings *stringTable) ([]SpanLink, []byte, error) } sl.Tracestate = s case 5: // flags - s, o, err := msgp.ReadInt32Bytes(o) + s, o, err := msgp.ReadUint32Bytes(o) if err != nil { return ret, o, err } - sl.Flags = uint32(s) + sl.Flags = s } } ret[i] = sl @@ -1118,7 +1118,7 @@ func DecodeSpanEvents(b []byte, strings *stringTable) ([]spanEvent, []byte, erro for fields > 0 { fields-- - f, o, err := msgp.ReadUint32Bytes(b) + f, o, err := msgp.ReadUint32Bytes(o) if err != nil { return ret, o, err } From 60165d3ba2756bf671a5497a65f2cd3058bef2ac Mon Sep 17 00:00:00 2001 From: Hannah Kim Date: Tue, 30 Sep 2025 13:35:58 -0400 Subject: [PATCH 22/22] draft test --- ddtrace/tracer/payload_test.go | 44 ++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index 239ca52385..4c840904b5 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -60,9 +60,9 @@ func TestPayloadIntegrity(t *testing.T) { } } -// TestPayloadDecode ensures that whatever we push into the payload can +// TestPayloadV04Decode ensures that whatever we push into a v0.4 payload can // be decoded by the codec. -func TestPayloadDecode(t *testing.T) { +func TestPayloadV04Decode(t *testing.T) { for _, n := range []int{10, 1 << 10} { t.Run(strconv.Itoa(n), func(t *testing.T) { assert := assert.New(t) @@ -77,6 +77,46 @@ func TestPayloadDecode(t *testing.T) { } } +// TestPayloadV1Decode ensures that whatever we push into a v1 payload can +// be decoded by the codec, and that it matches the original payload. +func TestPayloadV1Decode(t *testing.T) { + for _, n := range []int{10, 1 << 10} { + t.Run(strconv.Itoa(n), func(t *testing.T) { + assert := assert.New(t) + p := newPayloadV1() + + p.containerID = "containerID" + p.languageName = "languageName" + p.languageVersion = "languageVersion" + p.tracerVersion = "tracerVersion" + p.runtimeID = "runtimeID" + p.env = "env" + p.hostname = "hostname" + p.appVersion = "appVersion" + + for i := 0; i < n; i++ { + _, _ = p.push(newSpanList(i%5 + 1)) + } + + encoded, err := io.ReadAll(p) + assert.NoError(err) + + got := newPayloadV1() + _, err = got.Decode(encoded) + assert.NoError(err) + + assert.Equal(p.containerID, got.containerID) + assert.Equal(p.languageName, got.languageName) + assert.Equal(p.languageVersion, got.languageVersion) + assert.Equal(p.tracerVersion, got.tracerVersion) + assert.Equal(p.runtimeID, got.runtimeID) + assert.Equal(p.env, got.env) + assert.Equal(p.hostname, got.hostname) + assert.Equal(p.appVersion, got.appVersion) + }) + } +} + func BenchmarkPayloadThroughput(b *testing.B) { b.Run("10K", benchmarkPayloadThroughput(1)) b.Run("100K", benchmarkPayloadThroughput(10))