Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions typescript/basics/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"example-3": "ts-node-dev --transpile-only src/3_workflows.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.7.3",
"@restatedev/restate-sdk-clients": "^1.7.3"
"@restatedev/restate-sdk": "^1.8.0",
"@restatedev/restate-sdk-clients": "^1.8.0"
},
"devDependencies": {
"@types/node": "^20.12.12",
Expand Down
43 changes: 16 additions & 27 deletions typescript/basics/src/0_durable_execution.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import * as restate from "@restatedev/restate-sdk";
import { service } from "@restatedev/restate-sdk";
import {
SubscriptionRequest,
createRecurringPayment,
createSubscription,
} from "./utils/stubs";
import { SubscriptionRequest, createRecurringPayment, createSubscription } from "./utils/stubs";

// Restate helps you implement resilient applications:
// - Automatic retries
Expand All @@ -27,34 +23,27 @@ import {
// After a failure, a retry is triggered and this log gets replayed to recover the state of the handler.

const subscriptionService = restate.service({
name: "SubscriptionService",
handlers: {
add: async (ctx: restate.Context, req: SubscriptionRequest) => {
// Restate persists the result of all `ctx` actions and recovers them after failures
// For example, generate a stable idempotency key:
const paymentId = ctx.rand.uuidv4();
name: "SubscriptionService",
handlers: {
add: async (ctx: restate.Context, req: SubscriptionRequest) => {
// Restate persists the result of all `ctx` actions and recovers them after failures
// For example, generate a stable idempotency key:
const paymentId = ctx.rand.uuidv4();

// ctx.run persists results of successful actions and skips execution on retries
// Failed actions (timeouts, API downtime, etc.) get retried
const payRef = await ctx.run(() =>
createRecurringPayment(req.creditCard, paymentId)
);
// ctx.run persists results of successful actions and skips execution on retries
// Failed actions (timeouts, API downtime, etc.) get retried
const payRef = await ctx.run(() => createRecurringPayment(req.creditCard, paymentId));

for (const subscription of req.subscriptions) {
await ctx.run(() =>
createSubscription(req.userId, subscription, payRef)
);
}
},
for (const subscription of req.subscriptions) {
await ctx.run(() => createSubscription(req.userId, subscription, payRef));
}
},
})
},
});

// Create an HTTP endpoint to serve your services on port 9080
// or use .handler() to run on Lambda, Deno, Bun, Cloudflare Workers, ...
restate
.endpoint()
.bind(subscriptionService)
.listen(9080);
restate.endpoint().bind(subscriptionService).listen(9080);

/*
Check the README to learn how to run Restate.
Expand Down
123 changes: 61 additions & 62 deletions typescript/basics/src/1_building_blocks.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,72 @@
import * as restate from "@restatedev/restate-sdk";
import {
chargeBankAccount,
SubscriptionSvc,
} from "./utils/stubs";
import { chargeBankAccount, SubscriptionSvc } from "./utils/stubs";

const SubscriptionService: SubscriptionSvc = {name: "SubscriptionService"}
const SubscriptionService: SubscriptionSvc = { name: "SubscriptionService" };

/*
* RESTATE's DURABLE BUILDING BLOCKS
*
* Restate turns familiar programming constructs into recoverable, distributed building blocks.
* They get persisted in Restate, survive failures, and can be revived on another process.
*
* No more need for retry/recovery logic, K/V stores, workflow orchestrators,
* scheduler services, message queues, ...
*
* The run handler below shows a catalog of these building blocks.
* Look at the other examples in this project to see how to use them in examples.
* RESTATE's DURABLE BUILDING BLOCKS
*
* Restate turns familiar programming constructs into recoverable, distributed building blocks.
* They get persisted in Restate, survive failures, and can be revived on another process.
*
* No more need for retry/recovery logic, K/V stores, workflow orchestrators,
* scheduler services, message queues, ...
*
* The run handler below shows a catalog of these building blocks.
* Look at the other examples in this project to see how to use them in examples.
*/
const myService = restate.service({
name: "myService",
handlers: {
// This handler can be called over HTTP at http://restate:8080/myService/handlerName
// Use the context to access Restate's durable building blocks
run: async (ctx: restate.Context) => {
name: "myService",
handlers: {
// This handler can be called over HTTP at http://restate:8080/myService/handlerName
// Use the context to access Restate's durable building blocks
run: async (ctx: restate.Context) => {
// ---
// 1. IDEMPOTENCY: Add an idempotency key to the header of your requests
// Restate deduplicates calls automatically. Nothing to do here.

// ---
// 1. IDEMPOTENCY: Add an idempotency key to the header of your requests
// Restate deduplicates calls automatically. Nothing to do here.
// ---
// 2. DURABLE RPC: Call other services without manual retry and deduplication logic
// Restate persists all requests and ensures execution till completion
const result = await ctx.objectClient(SubscriptionService, "my-sub-123").create("my-request");

// ---
// 2. DURABLE RPC: Call other services without manual retry and deduplication logic
// Restate persists all requests and ensures execution till completion
const result = await ctx.objectClient(SubscriptionService, "my-sub-123").create("my-request");
// ---
// 3. DURABLE MESSAGING: send (delayed) messages to other services without deploying a message broker
// Restate persists the timers and triggers execution
ctx.objectSendClient(SubscriptionService, "my-sub-123").create("my-request");

// ---
// 3. DURABLE MESSAGING: send (delayed) messages to other services without deploying a message broker
// Restate persists the timers and triggers execution
ctx.objectSendClient(SubscriptionService, "my-sub-123").create("my-request");
// ---
// 4. DURABLE PROMISES: tracked by Restate, can be moved between processes and survive failures
// Awakeables: block the workflow until notified by another handler
const { id, promise } = ctx.awakeable();
// Wait on the promise
// If the process crashes while waiting, Restate will recover the promise somewhere else
await promise;
// Another process can resolve the awakeable via its ID
ctx.resolveAwakeable(id);

// ---
// 4. DURABLE PROMISES: tracked by Restate, can be moved between processes and survive failures
// Awakeables: block the workflow until notified by another handler
const {id, promise} = ctx.awakeable()
// Wait on the promise
// If the process crashes while waiting, Restate will recover the promise somewhere else
await promise;
// Another process can resolve the awakeable via its ID
ctx.resolveAwakeable(id);
// ---
// 5. DURABLE TIMERS: sleep or wait for a timeout, tracked by Restate and recoverable
// When this runs on FaaS, the handler suspends and the timer is tracked by Restate
// Example of durable recoverable sleep
// If the service crashes two seconds later, Restate will invoke it after another 3 seconds
await ctx.sleep({ seconds: 5 });
// Example of waiting on a promise (call/awakeable/...) or a timeout
await promise.orTimeout({ seconds: 5 });
// Example of scheduling a handler for later on
ctx
.objectSendClient(SubscriptionService, "my-sub-123")
.cancel(restate.rpc.sendOpts({ delay: { days: 1 } }));

// ---
// 5. DURABLE TIMERS: sleep or wait for a timeout, tracked by Restate and recoverable
// When this runs on FaaS, the handler suspends and the timer is tracked by Restate
// Example of durable recoverable sleep
// If the service crashes two seconds later, Restate will invoke it after another 3 seconds
await ctx.sleep({ seconds: 5 })
// Example of waiting on a promise (call/awakeable/...) or a timeout
await promise.orTimeout({ seconds: 5 });
// Example of scheduling a handler for later on
ctx.objectSendClient(SubscriptionService, "my-sub-123").cancel(restate.rpc.sendOpts({ delay: { days: 1 } }));

// ---
// 7. PERSIST RESULTS: avoid re-execution of actions on retries
// Use this for non-deterministic actions or interaction with APIs, DBs, ...
// For example, generate idempotency keys that are stable across retries
// Then use these to call other APIs and let them deduplicate
const paymentDeduplicationID = ctx.rand.uuidv4();
await ctx.run(() =>
chargeBankAccount(paymentDeduplicationID, {amount: 100, account: "1234-5678-9012-3456"}));
},
}
})
// ---
// 7. PERSIST RESULTS: avoid re-execution of actions on retries
// Use this for non-deterministic actions or interaction with APIs, DBs, ...
// For example, generate idempotency keys that are stable across retries
// Then use these to call other APIs and let them deduplicate
const paymentDeduplicationID = ctx.rand.uuidv4();
await ctx.run(() =>
chargeBankAccount(paymentDeduplicationID, { amount: 100, account: "1234-5678-9012-3456" }),
);
},
},
});
20 changes: 4 additions & 16 deletions typescript/basics/src/3_workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ const signupWorkflow = restate.workflow({
name: "usersignup",
handlers: {
// --- The workflow logic ---
run: async (
ctx: restate.WorkflowContext,
user: { name: string; email: string },
) => {
run: async (ctx: restate.WorkflowContext, user: { name: string; email: string }) => {
// workflow ID = user ID; workflow runs once per user
const userId = ctx.key;

Expand All @@ -37,10 +34,7 @@ const signupWorkflow = restate.workflow({
},

// --- Other handlers interact with the workflow via queries and signals ---
click: async (
ctx: restate.WorkflowSharedContext,
request: { secret: string },
) => {
click: async (ctx: restate.WorkflowSharedContext, request: { secret: string }) => {
// Send data to the workflow via a durable promise
await ctx.promise<string>("link-clicked").resolve(request.secret);
},
Expand All @@ -67,10 +61,7 @@ Check the README to learn how to run Restate.
// or programmatically
async function signupUser(userId: string, name: string, email: string) {
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflowClient = rs.workflowClient<SignupApi>(
{ name: "usersignup" },
userId,
);
const workflowClient = rs.workflowClient<SignupApi>({ name: "usersignup" }, userId);
const response = await workflowClient.workflowSubmit({ name, email });

if (response.status != "Accepted") {
Expand All @@ -83,10 +74,7 @@ async function signupUser(userId: string, name: string, email: string) {
// interact with the workflow from any other code
async function verifyEmail(userId: string, emailSecret: string) {
const rs = restateClients.connect({ url: "http://restate:8080" });
const workflowClient = rs.workflowClient<SignupApi>(
{ name: "usersignup" },
userId,
);
const workflowClient = rs.workflowClient<SignupApi>({ name: "usersignup" }, userId);

await workflowClient.click({ secret: emailSecret });
}
28 changes: 15 additions & 13 deletions typescript/basics/src/utils/stubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,43 @@ export function createSubscription(
/**
* Simulates calling a payment API, with a random probability of API downtime.
*/
export function createRecurringPayment(
_creditCard: string,
paymentId: any,
): string {
export function createRecurringPayment(_creditCard: string, paymentId: any): string {
maybeCrash(0.3);
console.log(`>>> Creating recurring payment ${paymentId}`);
return "payment-reference";
}

// Stubs for 3_workflows.ts
export async function createUserEntry(entry: { name: string; email: string }) {
console.log(`Creating user entry for ${entry.name}`);
console.log(`Creating user entry for ${entry.name}`);
}
export async function sendEmailWithLink(req: {
userId: string,
user: {name: string, email: string};
userId: string;
user: { name: string; email: string };
secret: string;
}) {
console.info(`Sending email to ${req.user.email} with secret ${req.secret}. \n
console.info(`Sending email to ${req.user.email} with secret ${req.secret}. \n
To simulate a user clicking the link, run the following command: \n
curl localhost:8080/usersignup/${req.userId}/click -H 'content-type: application/json' -d '{ "secret": "${req.secret}"}'`);
}

export function chargeBankAccount(_paymentDeduplicationID: string, _payment: { amount: number; account: string }) {
export function chargeBankAccount(
_paymentDeduplicationID: string,
_payment: { amount: number; account: string },
) {
return undefined;
}

const subscriptionService = restate.object({
name: "SubscriptionService",
handlers: {
create: async (ctx: restate.ObjectContext, userId: string) => { return "SUCCESS" },
create: async (ctx: restate.ObjectContext, userId: string) => {
return "SUCCESS";
},
cancel: async (ctx: restate.ObjectContext) => {
console.info(`Cancelling all subscriptions for user ${ctx.key}`);
},
}
})
},
});

export type SubscriptionSvc = typeof subscriptionService;
export type SubscriptionSvc = typeof subscriptionService;
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"stable-diffusion-service": "ts-node-dev --watch ./src --respawn --transpile-only src/stable_diffusion.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.7.3",
"@restatedev/restate-sdk": "^1.8.0",
"axios": "^1.6.7",
"axios-retry": "^4.0.0",
"jimp": "^0.22.10",
Expand Down
20 changes: 10 additions & 10 deletions typescript/end-to-end-applications/ai-image-workflows/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import * as restate from "@restatedev/restate-sdk";
import {imageProcessingWorkflow} from "./image_processing_workflow";
import {transformerService} from "./transformer_service";
import {puppeteerService} from "./puppeteer_service";
import {stableDiffusion} from "./stable_diffusion";
import { imageProcessingWorkflow } from "./image_processing_workflow";
import { transformerService } from "./transformer_service";
import { puppeteerService } from "./puppeteer_service";
import { stableDiffusion } from "./stable_diffusion";

restate
.endpoint()
.bind(imageProcessingWorkflow)
.bind(transformerService)
.bind(puppeteerService)
.bind(stableDiffusion)
.listen(9080);
.endpoint()
.bind(imageProcessingWorkflow)
.bind(transformerService)
.bind(puppeteerService)
.bind(stableDiffusion)
.listen(9080);
Loading
Loading