Skip to content

Commit d756bb0

Browse files
Introduce JournalValueCodec API (#581)
* JournalValueCodec * Introduce new interface JournalValueCodec to allow customize encoding of values after serialization and before deserialization. Expose it as experimental. * Disable non-deterministic checks when the user provides a codec * Use the codec in handler input/output, attach, get state, set state, call, send, run, awakeable, complete awakeable, get promise, peek promise, complete promise * Add JournalValueCodec support to ingress client --------- Co-authored-by: igalshilman <[email protected]>
1 parent 8b51ed1 commit d756bb0

25 files changed

+274
-86
lines changed

packages/restate-sdk-clients/src/api.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
WorkflowDefinitionFrom,
88
Serde,
99
Duration,
10+
JournalValueCodec,
1011
} from "@restatedev/restate-sdk-core";
1112
import { millisOrDurationToMillis } from "@restatedev/restate-sdk-core";
1213

@@ -322,4 +323,11 @@ export type ConnectionOpts = {
322323
* Use this to attach authentication headers.
323324
*/
324325
headers?: Record<string, string>;
326+
327+
/**
328+
* Codec to use for input/outputs. Check {@link JournalValueCodec} for more details
329+
*
330+
* @experimental
331+
*/
332+
journalValueCodec?: JournalValueCodec;
325333
};

packages/restate-sdk-clients/src/ingress.ts

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
type VirtualObjectDefinitionFrom,
1919
type Serde,
2020
serde,
21+
type JournalValueCodec,
2122
} from "@restatedev/restate-sdk-core";
2223
import type {
2324
ConnectionOpts,
@@ -147,7 +148,8 @@ const doComponentInvocation = async <I, O>(
147148

148149
const { body, contentType } = serializeBodyWithContentType(
149150
params.parameter,
150-
inputSerde
151+
inputSerde,
152+
opts.journalValueCodec
151153
);
152154
//
153155
// headers
@@ -201,12 +203,15 @@ const doComponentInvocation = async <I, O>(
201203
`Request failed: ${httpResponse.status}\n${body}`
202204
);
203205
}
204-
const responseBuf = await httpResponse.arrayBuffer();
206+
const responseBuf = new Uint8Array(await httpResponse.arrayBuffer());
205207
if (!params.send) {
208+
const decodedBuf = opts.journalValueCodec
209+
? await opts.journalValueCodec.decode(responseBuf)
210+
: responseBuf;
206211
const outputSerde = params.opts?.opts.output ?? serde.json;
207-
return outputSerde.deserialize(new Uint8Array(responseBuf)) as O;
212+
return outputSerde.deserialize(decodedBuf) as O;
208213
}
209-
const json = serde.json.deserialize(new Uint8Array(responseBuf)) as O;
214+
const json = serde.json.deserialize(responseBuf) as O;
210215
return { ...json, attachable };
211216
};
212217

@@ -252,8 +257,11 @@ const doWorkflowHandleCall = async <O>(
252257
signal,
253258
});
254259
if (httpResponse.ok) {
255-
const responseBuf = await httpResponse.arrayBuffer();
256-
return outputSerde.deserialize(new Uint8Array(responseBuf)) as O;
260+
const responseBuf = new Uint8Array(await httpResponse.arrayBuffer());
261+
const decodedBuf = opts.journalValueCodec
262+
? await opts.journalValueCodec.decode(responseBuf)
263+
: responseBuf;
264+
return outputSerde.deserialize(decodedBuf) as O;
257265
}
258266
const body = await httpResponse.text();
259267
throw new HttpCallError(
@@ -412,7 +420,8 @@ class HttpIngress implements Ingress {
412420
const url = `${this.opts.url}/restate/a/${id}/resolve`;
413421
const { body, contentType } = serializeBodyWithContentType(
414422
payload,
415-
payloadSerde ?? serde.json
423+
payloadSerde ?? serde.json,
424+
this.opts.journalValueCodec
416425
);
417426
const headers = {
418427
...(this.opts.headers ?? {}),
@@ -481,9 +490,11 @@ class HttpIngress implements Ingress {
481490
headers,
482491
});
483492
if (httpResponse.ok) {
484-
const responseBuf = await httpResponse.arrayBuffer();
485-
const ser = resultSerde ?? serde.json;
486-
return ser.deserialize(new Uint8Array(responseBuf)) as T;
493+
const responseBuf = new Uint8Array(await httpResponse.arrayBuffer());
494+
const decodedBuf = this.opts.journalValueCodec
495+
? await this.opts.journalValueCodec.decode(responseBuf)
496+
: responseBuf;
497+
return (resultSerde ?? serde.json).deserialize(decodedBuf) as T;
487498
}
488499
const body = await httpResponse.text();
489500
throw new HttpCallError(
@@ -504,15 +515,19 @@ function computeDelayAsIso(opts: SendOpts): string {
504515

505516
function serializeBodyWithContentType(
506517
body: unknown,
507-
serde: Serde<unknown>
518+
serde: Serde<unknown>,
519+
journalValueCodec?: JournalValueCodec
508520
): {
509521
body?: Uint8Array;
510522
contentType?: string;
511523
} {
512524
if (body === undefined) {
513525
return {};
514526
}
515-
const buffer = serde.serialize(body);
527+
let buffer = serde.serialize(body);
528+
if (journalValueCodec) {
529+
buffer = journalValueCodec.encode(buffer);
530+
}
516531
return {
517532
body: buffer,
518533
contentType: serde.contentType,

packages/restate-sdk-clients/src/public_api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export type {
2121
Workflow,
2222
VirtualObject,
2323
Duration,
24+
JournalValueCodec,
2425
} from "@restatedev/restate-sdk-core";
2526

2627
export { serde } from "@restatedev/restate-sdk-core";

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ export class WasmResponseHead {
9595
}
9696
export class WasmVM {
9797
free(): void;
98-
constructor(headers: WasmHeader[], log_level: LogLevel, logger_id: number);
98+
constructor(headers: WasmHeader[], log_level: LogLevel, logger_id: number, disable_payload_checks: boolean);
9999
get_response_head(): WasmResponseHead;
100100
notify_input(buffer: Uint8Array): void;
101101
notify_input_closed(): void;

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,11 +550,12 @@ export class WasmVM {
550550
* @param {WasmHeader[]} headers
551551
* @param {LogLevel} log_level
552552
* @param {number} logger_id
553+
* @param {boolean} disable_payload_checks
553554
*/
554-
constructor(headers, log_level, logger_id) {
555+
constructor(headers, log_level, logger_id, disable_payload_checks) {
555556
const ptr0 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
556557
const len0 = WASM_VECTOR_LEN;
557-
const ret = wasm.wasmvm_new(ptr0, len0, log_level, logger_id);
558+
const ret = wasm.wasmvm_new(ptr0, len0, log_level, logger_id, disable_payload_checks);
558559
if (ret[2]) {
559560
throw takeFromExternrefTable0(ret[1]);
560561
}
5.87 KB
Binary file not shown.

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.wasm.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export const __wbg_wasminput_free: (a: number, b: number) => void;
1414
export const __wbg_get_wasminput_headers: (a: number) => [number, number];
1515
export const __wbg_get_wasminput_input: (a: number) => any;
1616
export const __wbg_wasmvm_free: (a: number, b: number) => void;
17-
export const wasmvm_new: (a: number, b: number, c: number, d: number) => [number, number, number];
17+
export const wasmvm_new: (a: number, b: number, c: number, d: number, e: number) => [number, number, number];
1818
export const wasmvm_get_response_head: (a: number) => number;
1919
export const wasmvm_notify_input: (a: number, b: number, c: number) => void;
2020
export const wasmvm_notify_input_closed: (a: number) => void;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH
3+
*
4+
* This file is part of the Restate SDK for Node.js/TypeScript,
5+
* which is released under the MIT license.
6+
*
7+
* You can find a copy of the license in file LICENSE in the root
8+
* directory of this repository or package, or at
9+
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
10+
*/
11+
12+
/**
13+
* Journal values codec.
14+
*
15+
* This allows to transform journal values after being serialized, before writing them to the wire.
16+
*
17+
* Values that are passed through the codec:
18+
*
19+
* * Handlers input and success output
20+
* * ctx.run success results
21+
* * Awakeables/Promise success results
22+
* * State values
23+
*
24+
* @experimental
25+
*/
26+
export type JournalValueCodec = {
27+
/**
28+
* Encodes the given buffer.
29+
*
30+
* This will be applied *after* serialization.
31+
*
32+
* @param buf The buffer to encode. Empty byte buffers should be appropriately handled as well.
33+
* @returns The encoded buffer
34+
*/
35+
encode(buf: Uint8Array): Uint8Array;
36+
37+
/**
38+
* Decodes the given buffer.
39+
*
40+
* This will be applied *before* deserialization.
41+
*
42+
* @param buf The buffer to decode.
43+
* @returns A promise that resolves to the decoded buffer.
44+
*/
45+
decode(buf: Uint8Array): Promise<Uint8Array>;
46+
};

packages/restate-sdk-core/src/public_api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,5 @@ export { serde } from "./serde_api.js";
3838

3939
export type { Duration } from "./duration.js";
4040
export { durationToMillis, millisOrDurationToMillis } from "./duration.js";
41+
42+
export type { JournalValueCodec } from "./entry_codec.js";

packages/restate-sdk/src/common_api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export type {
6363
WorkflowHandler,
6464
WorkflowSharedHandler,
6565
Duration,
66+
JournalValueCodec,
6667
} from "@restatedev/restate-sdk-core";
6768
export { serde } from "@restatedev/restate-sdk-core";
6869

0 commit comments

Comments
 (0)