Skip to content

Commit ff1cb2d

Browse files
Moved static functions out of the method
1 parent fcf3ff5 commit ff1cb2d

File tree

1 file changed

+100
-89
lines changed

1 file changed

+100
-89
lines changed

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 100 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -599,24 +599,36 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
599599
): Promise<any> {
600600
const stream = response.data?.body;
601601
const modelId = response.request.commandInput?.modelId;
602-
if (!stream || !span.isRecording()) return;
603-
604-
const wrappedStream = instrumentAsyncIterable(
605-
stream,
606-
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
607-
const parsedChunk = parseChunk(chunk?.chunk?.bytes);
602+
if (!stream) return;
603+
604+
const wrappedStream =
605+
BedrockRuntimeServiceExtension.instrumentAsyncIterable(
606+
stream,
607+
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
608+
const parsedChunk = BedrockRuntimeServiceExtension.parseChunk(
609+
chunk?.chunk?.bytes
610+
);
608611

609-
if (!parsedChunk) return;
612+
if (!parsedChunk) return;
610613

611-
if (modelId.includes('amazon.titan')) {
612-
recordTitanAttributes(parsedChunk);
613-
} else if (modelId.includes('anthropic.claude')) {
614-
recordClaudeAttributes(parsedChunk);
615-
} else if (modelId.includes('amazon.nova')) {
616-
recordNovaAttributes(parsedChunk);
614+
if (modelId.includes('amazon.titan')) {
615+
BedrockRuntimeServiceExtension.recordTitanAttributes(
616+
parsedChunk,
617+
span
618+
);
619+
} else if (modelId.includes('anthropic.claude')) {
620+
BedrockRuntimeServiceExtension.recordClaudeAttributes(
621+
parsedChunk,
622+
span
623+
);
624+
} else if (modelId.includes('amazon.nova')) {
625+
BedrockRuntimeServiceExtension.recordNovaAttributes(
626+
parsedChunk,
627+
span
628+
);
629+
}
617630
}
618-
}
619-
);
631+
);
620632
// Replace the original response body with our instrumented stream.
621633
// - Defers span.end() until the entire stream is consumed
622634
// This ensures downstream consumers still receive the full stream correctly,
@@ -631,93 +643,92 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
631643
}
632644
})();
633645
return response.data;
634-
635-
// Tap into the stream at the chunk level without modifying the chunk itself.
636-
function instrumentAsyncIterable<T>(
637-
stream: AsyncIterable<T>,
638-
onChunk: (chunk: T) => void
639-
): AsyncIterable<T> {
640-
return {
641-
[Symbol.asyncIterator]: async function* () {
642-
for await (const chunk of stream) {
643-
onChunk(chunk);
644-
yield chunk;
645-
}
646-
},
647-
};
648-
}
649-
650-
function parseChunk(bytes?: Uint8Array): any {
651-
if (!bytes || !(bytes instanceof Uint8Array)) return null;
652-
try {
653-
const str = Buffer.from(bytes).toString('utf-8');
654-
return JSON.parse(str);
655-
} catch (err) {
656-
console.warn('Failed to parse streamed chunk', err);
657-
return null;
658-
}
659-
}
660-
661-
function recordNovaAttributes(parsedChunk: any) {
662-
if (parsedChunk.metadata?.usage !== undefined) {
663-
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
664-
span.setAttribute(
665-
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
666-
parsedChunk.metadata.usage.inputTokens
667-
);
668-
}
669-
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
670-
span.setAttribute(
671-
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
672-
parsedChunk.metadata.usage.outputTokens
673-
);
646+
}
647+
// Tap into the stream at the chunk level without modifying the chunk itself.
648+
private static instrumentAsyncIterable<T>(
649+
stream: AsyncIterable<T>,
650+
onChunk: (chunk: T) => void
651+
): AsyncIterable<T> {
652+
return {
653+
[Symbol.asyncIterator]: async function* () {
654+
for await (const chunk of stream) {
655+
onChunk(chunk);
656+
yield chunk;
674657
}
675-
}
676-
if (parsedChunk.messageStop?.stopReason !== undefined) {
677-
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
678-
parsedChunk.messageStop.stopReason,
679-
]);
680-
}
658+
},
659+
};
660+
}
661+
662+
private static parseChunk(bytes?: Uint8Array): any {
663+
if (!bytes || !(bytes instanceof Uint8Array)) return null;
664+
try {
665+
const str = Buffer.from(bytes).toString('utf-8');
666+
return JSON.parse(str);
667+
} catch (err) {
668+
console.warn('Failed to parse streamed chunk', err);
669+
return null;
681670
}
671+
}
682672

683-
function recordClaudeAttributes(parsedChunk: any) {
684-
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
673+
private static recordNovaAttributes(parsedChunk: any, span: Span) {
674+
if (parsedChunk.metadata?.usage !== undefined) {
675+
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
685676
span.setAttribute(
686677
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
687-
parsedChunk.message.usage.input_tokens
678+
parsedChunk.metadata.usage.inputTokens
688679
);
689680
}
690-
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
681+
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
691682
span.setAttribute(
692683
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
693-
parsedChunk.message.usage.output_tokens
684+
parsedChunk.metadata.usage.outputTokens
694685
);
695686
}
696-
if (parsedChunk.delta?.stop_reason !== undefined) {
697-
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
698-
parsedChunk.delta.stop_reason,
699-
]);
700-
}
701687
}
688+
if (parsedChunk.messageStop?.stopReason !== undefined) {
689+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
690+
parsedChunk.messageStop.stopReason,
691+
]);
692+
}
693+
}
702694

703-
function recordTitanAttributes(parsedChunk: any) {
704-
if (parsedChunk.inputTextTokenCount !== undefined) {
705-
span.setAttribute(
706-
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
707-
parsedChunk.inputTextTokenCount
708-
);
709-
}
710-
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
711-
span.setAttribute(
712-
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
713-
parsedChunk.totalOutputTextTokenCount
714-
);
715-
}
716-
if (parsedChunk.completionReason !== undefined) {
717-
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
718-
parsedChunk.completionReason,
719-
]);
720-
}
695+
private static recordClaudeAttributes(parsedChunk: any, span: Span) {
696+
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
697+
span.setAttribute(
698+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
699+
parsedChunk.message.usage.input_tokens
700+
);
701+
}
702+
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
703+
span.setAttribute(
704+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
705+
parsedChunk.message.usage.output_tokens
706+
);
707+
}
708+
if (parsedChunk.delta?.stop_reason !== undefined) {
709+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
710+
parsedChunk.delta.stop_reason,
711+
]);
712+
}
713+
}
714+
715+
private static recordTitanAttributes(parsedChunk: any, span: Span) {
716+
if (parsedChunk.inputTextTokenCount !== undefined) {
717+
span.setAttribute(
718+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
719+
parsedChunk.inputTextTokenCount
720+
);
721+
}
722+
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
723+
span.setAttribute(
724+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
725+
parsedChunk.totalOutputTextTokenCount
726+
);
727+
}
728+
if (parsedChunk.completionReason !== undefined) {
729+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
730+
parsedChunk.completionReason,
731+
]);
721732
}
722733
}
723734
}

0 commit comments

Comments
 (0)