Skip to content

Commit cd6a2f2

Browse files
akrpic77Andrej Krpic
andauthored
feat(mqtt): add support for context propagation for mqtt nodes if mqtt v5 is used (#6)
Co-authored-by: Andrej Krpic <[email protected]>
1 parent 77170b5 commit cd6a2f2

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

lib/opentelemetry-node.js

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,10 @@ function createSpan (tracer, msg, nodeDefinition, node, isNotTraced) {
145145
if (nodeDefinition.type === 'http in') {
146146
// try to get trace context in incoming http request headers
147147
ctx = propagator.extract(context.active(), msg.req.headers, defaultTextMapGetter)
148-
}
149-
if (nodeDefinition.type === 'amqp-in') {
148+
} else if (nodeDefinition.type === 'mqtt in' && msg.userProperties) {
149+
// try to get trace context in incoming mqtt v5 user properties
150+
ctx = propagator.extract(context.active(), msg.userProperties, defaultTextMapGetter)
151+
} else if (nodeDefinition.type === 'amqp-in') {
150152
// try to get trace context in incoming ampq message headers
151153
ctx = propagator.extract(context.active(), msg.properties.headers, defaultTextMapGetter)
152154
}
@@ -348,14 +350,25 @@ module.exports = function (RED) {
348350
logEvent(node, '4.postDeliver', sendEvent)
349351
const span = createSpan(tracer, sendEvent.msg, sendEvent.destination.node, node, ignoredTypesList.includes(sendEvent.destination.node.type))
350352
if (propagateHeadersTypesList.includes(sendEvent.destination.node.type)) {
351-
// add trace context in http request headers
352353
const output = {}
353354
const ctx = trace.setSpan(context.active(), span)
354355
propagation.inject(ctx, output)
355-
if (!sendEvent.msg.headers) {
356-
sendEvent.msg.headers = {}
356+
switch (sendEvent.destination.node.type) {
357+
// add trace context in mqtt v5 user properties
358+
case 'mqtt out':
359+
if (!sendEvent.msg.userProperties) {
360+
sendEvent.msg.userProperties = {}
361+
}
362+
Object.assign(sendEvent.msg.userProperties, output)
363+
break
364+
default:
365+
// add trace context in http request headers
366+
if (!sendEvent.msg.headers) {
367+
sendEvent.msg.headers = {}
368+
}
369+
Object.assign(sendEvent.msg.headers, output)
370+
break
357371
}
358-
Object.assign(sendEvent.msg.headers, output)
359372
}
360373
if (sendEvent.source.node.type === 'switch' || sendEvent.source.node.type.startsWith('subflow')) {
361374
// end switch or subflow spans as they do not trigger onComplete

0 commit comments

Comments
 (0)