Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 225 additions & 2 deletions packages/instrumentation-aws-sdk/src/services/bedrock-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import {
Attributes,
DiagLogger,
diag,
Histogram,
HrTime,
Meter,
Expand Down Expand Up @@ -59,6 +60,7 @@ import {
export class BedrockRuntimeServiceExtension implements ServiceExtension {
private tokenUsage!: Histogram;
private operationDuration!: Histogram;
private _diag: DiagLogger = diag;

updateMetricInstruments(meter: Meter) {
// https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
Expand Down Expand Up @@ -101,7 +103,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
case 'InvokeModelWithResponseStream':
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
}

return {
Expand Down Expand Up @@ -157,7 +161,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
private requestPreSpanHookInvokeModel(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName: string | undefined;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -312,6 +317,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -346,6 +352,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
case 'InvokeModelWithResponseStream':
return this.responseHookInvokeModelWithResponseStream(
response,
span,
tracer,
config
);
}
}

Expand Down Expand Up @@ -579,4 +592,214 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
}
}
}

private async responseHookInvokeModelWithResponseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): Promise<any> {
const stream = response.data?.body;
const modelId = response.request.commandInput?.modelId;
if (!stream || !modelId) return;

// Replace the original response body with our instrumented stream.
// - Defers span.end() until the entire stream is consumed
// This ensures downstream consumers still receive the full stream correctly,
// while OpenTelemetry can record span attributes from streamed data.
response.data.body = async function* (
this: BedrockRuntimeServiceExtension
) {
try {
for await (const chunk of stream) {
const parsedChunk = this.parseChunk(chunk?.chunk?.bytes);

if (!parsedChunk) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuliia-fryshko

nit: If, for whatever reason, a chunk cannot be parsed for adding otel data, should we perhaps continue to yield the chunks and eventually span.end() rather than return; here?

if (modelId.includes('amazon.titan')) {
BedrockRuntimeServiceExtension.recordTitanAttributes(
parsedChunk,
span
);
} else if (modelId.includes('anthropic.claude')) {
BedrockRuntimeServiceExtension.recordClaudeAttributes(
parsedChunk,
span
);
} else if (modelId.includes('amazon.nova')) {
BedrockRuntimeServiceExtension.recordNovaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('meta.llama')) {
BedrockRuntimeServiceExtension.recordLlamaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command-r')) {
BedrockRuntimeServiceExtension.recordCohereRAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command')) {
BedrockRuntimeServiceExtension.recordCohereAttributes(
parsedChunk,
span
);
} else if (modelId.includes('mistral')) {
BedrockRuntimeServiceExtension.recordMistralAttributes(
parsedChunk,
span
);
}
yield chunk;
}
} finally {
span.end();
}
}.bind(this)();
return response.data;
}

private parseChunk(bytes?: Uint8Array): any {
if (!bytes || !(bytes instanceof Uint8Array)) return null;
try {
const str = Buffer.from(bytes).toString('utf-8');
return JSON.parse(str);
} catch (err) {
this._diag.warn('Failed to parse streamed chunk', err);
return null;
}
}

private static recordNovaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.metadata?.usage !== undefined) {
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.metadata.usage.inputTokens
);
}
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.metadata.usage.outputTokens
);
}
}
if (parsedChunk.messageStop?.stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.messageStop.stopReason,
]);
}
}

private static recordClaudeAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.message.usage.input_tokens
);
}
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.message.usage.output_tokens
);
}
if (parsedChunk.delta?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.delta.stop_reason,
]);
}
}

private static recordTitanAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.inputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.inputTextTokenCount
);
}
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.totalOutputTextTokenCount
);
}
if (parsedChunk.completionReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.completionReason,
]);
}
}
private static recordLlamaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.prompt_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.prompt_token_count
);
}
if (parsedChunk.generation_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.generation_token_count
);
}
if (parsedChunk.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.stop_reason,
]);
}
}

private static recordMistralAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.outputs?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.outputs[0].text.length / 6)
);
}
if (parsedChunk.outputs?.[0]?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.outputs[0].stop_reason,
]);
}
}

private static recordCohereAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.generations?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.generations[0].text.length / 6)
);
}
if (parsedChunk.generations?.[0]?.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.generations[0].finish_reason,
]);
}
}

private static recordCohereRAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.text !== undefined) {
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
Math.ceil(parsedChunk.text.length / 6)
);
}
if (parsedChunk.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.finish_reason,
]);
}
}
}
Loading
Loading