Skip to content

Commit 16a0e5c

Browse files
added instrumentation for Bedrock Response Stream of llama, cohere and mistral
1 parent 7f1dd71 commit 16a0e5c

File tree

1 file changed

+108
-37
lines changed

1 file changed

+108
-37
lines changed

packages/instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 108 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import {
1717
Attributes,
1818
DiagLogger,
19+
diag,
1920
Histogram,
2021
HrTime,
2122
Meter,
@@ -59,6 +60,7 @@ import {
5960
export class BedrockRuntimeServiceExtension implements ServiceExtension {
6061
private tokenUsage!: Histogram;
6162
private operationDuration!: Histogram;
63+
private _diag: DiagLogger = diag;
6264

6365
updateMetricInstruments(meter: Meter) {
6466
// https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
@@ -599,18 +601,20 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
599601
): Promise<any> {
600602
const stream = response.data?.body;
601603
const modelId = response.request.commandInput?.modelId;
602-
if (!stream) return;
603-
604-
const wrappedStream =
605-
BedrockRuntimeServiceExtension.instrumentAsyncIterable(
606-
stream,
607-
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
608-
const parsedChunk = BedrockRuntimeServiceExtension.parseChunk(
609-
chunk?.chunk?.bytes
610-
);
604+
if (!stream || !modelId) return;
611605

612-
if (!parsedChunk) return;
606+
// Replace the original response body with our instrumented stream.
607+
// - Defers span.end() until the entire stream is consumed
608+
// This ensures downstream consumers still receive the full stream correctly,
609+
// while OpenTelemetry can record span attributes from streamed data.
610+
response.data.body = async function* (
611+
this: BedrockRuntimeServiceExtension
612+
) {
613+
try {
614+
for await (const chunk of stream) {
615+
const parsedChunk = this.parseChunk(chunk?.chunk?.bytes);
613616

617+
if (!parsedChunk) return;
614618
if (modelId.includes('amazon.titan')) {
615619
BedrockRuntimeServiceExtension.recordTitanAttributes(
616620
parsedChunk,
@@ -626,46 +630,43 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
626630
parsedChunk,
627631
span
628632
);
633+
} else if (modelId.includes('meta.llama')) {
634+
BedrockRuntimeServiceExtension.recordLlamaAttributes(
635+
parsedChunk,
636+
span
637+
);
638+
} else if (modelId.includes('cohere.command-r')) {
639+
BedrockRuntimeServiceExtension.recordCohereRAttributes(
640+
parsedChunk,
641+
span
642+
);
643+
} else if (modelId.includes('cohere.command')) {
644+
BedrockRuntimeServiceExtension.recordCohereAttributes(
645+
parsedChunk,
646+
span
647+
);
648+
} else if (modelId.includes('mistral')) {
649+
BedrockRuntimeServiceExtension.recordMistralAttributes(
650+
parsedChunk,
651+
span
652+
);
629653
}
630-
}
631-
);
632-
// Replace the original response body with our instrumented stream.
633-
// - Defers span.end() until the entire stream is consumed
634-
// This ensures downstream consumers still receive the full stream correctly,
635-
// while OpenTelemetry can record span attributes from streamed data.
636-
response.data.body = (async function* () {
637-
try {
638-
for await (const item of wrappedStream) {
639-
yield item;
654+
yield chunk;
640655
}
641656
} finally {
642657
span.end();
643658
}
644-
})();
659+
}.bind(this)();
645660
return response.data;
646661
}
647-
// Tap into the stream at the chunk level without modifying the chunk itself.
648-
private static instrumentAsyncIterable<T>(
649-
stream: AsyncIterable<T>,
650-
onChunk: (chunk: T) => void
651-
): AsyncIterable<T> {
652-
return {
653-
[Symbol.asyncIterator]: async function* () {
654-
for await (const chunk of stream) {
655-
onChunk(chunk);
656-
yield chunk;
657-
}
658-
},
659-
};
660-
}
661662

662-
private static parseChunk(bytes?: Uint8Array): any {
663+
private parseChunk(bytes?: Uint8Array): any {
663664
if (!bytes || !(bytes instanceof Uint8Array)) return null;
664665
try {
665666
const str = Buffer.from(bytes).toString('utf-8');
666667
return JSON.parse(str);
667668
} catch (err) {
668-
console.warn('Failed to parse streamed chunk', err);
669+
this._diag.warn('Failed to parse streamed chunk', err);
669670
return null;
670671
}
671672
}
@@ -731,4 +732,74 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
731732
]);
732733
}
733734
}
735+
private static recordLlamaAttributes(parsedChunk: any, span: Span) {
736+
if (parsedChunk.prompt_token_count !== undefined) {
737+
span.setAttribute(
738+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
739+
parsedChunk.prompt_token_count
740+
);
741+
}
742+
if (parsedChunk.generation_token_count !== undefined) {
743+
span.setAttribute(
744+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
745+
parsedChunk.generation_token_count
746+
);
747+
}
748+
if (parsedChunk.stop_reason !== undefined) {
749+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
750+
parsedChunk.stop_reason,
751+
]);
752+
}
753+
}
754+
755+
private static recordMistralAttributes(parsedChunk: any, span: Span) {
756+
if (parsedChunk.outputs?.[0]?.text !== undefined) {
757+
span.setAttribute(
758+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
759+
// NOTE: We approximate the token count since this value is not directly available in the body
760+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
761+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
762+
Math.ceil(parsedChunk.outputs[0].text.length / 6)
763+
);
764+
}
765+
if (parsedChunk.outputs?.[0]?.stop_reason !== undefined) {
766+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
767+
parsedChunk.outputs[0].stop_reason,
768+
]);
769+
}
770+
}
771+
772+
private static recordCohereAttributes(parsedChunk: any, span: Span) {
773+
if (parsedChunk.generations?.[0]?.text !== undefined) {
774+
span.setAttribute(
775+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
776+
// NOTE: We approximate the token count since this value is not directly available in the body
777+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
778+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
779+
Math.ceil(parsedChunk.generations[0].text.length / 6)
780+
);
781+
}
782+
if (parsedChunk.generations?.[0]?.finish_reason !== undefined) {
783+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
784+
parsedChunk.generations[0].finish_reason,
785+
]);
786+
}
787+
}
788+
789+
private static recordCohereRAttributes(parsedChunk: any, span: Span) {
790+
if (parsedChunk.text !== undefined) {
791+
// NOTE: We approximate the token count since this value is not directly available in the body
792+
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
793+
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
794+
span.setAttribute(
795+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
796+
Math.ceil(parsedChunk.text.length / 6)
797+
);
798+
}
799+
if (parsedChunk.finish_reason !== undefined) {
800+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
801+
parsedChunk.finish_reason,
802+
]);
803+
}
804+
}
734805
}

0 commit comments

Comments
 (0)